# 水务管理系统 - 数据接入层 ## 项目概述 本项目是水务管理系统中的数据接入层,实现了多源数据接入、实时WebSocket推送和批量数据导入功能。 ## 功能特性 ### 1. REST API 数据接入 - **IoT设备数据接入**:支持实时接收传感器数据 - **手动数据录入**:支持人工录入数据 - **批量API导入**:支持通过API批量导入数据 - **数据查询接口**:提供按数据类型、时间范围等条件的数据查询 - **统计分析接口**:提供数据统计和分析功能 ### 2. WebSocket 实时推送 - **实时数据推送**:传感器数据实时推送到客户端 - **连接管理**:支持多客户端连接和订阅管理 - **数据历史**:新连接客户端可以获取历史数据 - **警报推送**:支持实时警报推送 - **心跳检测**:支持连接状态监控 ### 3. 批量数据导入 - **多格式支持**:支持CSV、Excel、JSON格式导入 - **数据验证**:内置数据验证和错误处理 - **字段映射**:支持水利行业标准字段映射 - **单位转换**:自动进行单位转换和标准化 - **批量处理**:支持大批量数据处理 ## 技术栈 - **后端框架**:FastAPI - **WebSocket**:websockets - **数据处理**:pandas, numpy - **异步处理**:asyncio - **数据验证**:pydantic - **文件处理**:aiofiles ## 项目结构 ``` water-management-system/ ├── src/ │ ├── api/ # REST API模块 │ │ └── rest_api.py # 主API服务器 │ ├── websocket/ # WebSocket模块 │ │ └── websocket_server.py # WebSocket服务器 │ ├── batch/ # 批量导入模块 │ │ └── batch_import.py # 批量导入功能 │ ├── utils/ # 工具模块 │ │ └── data_utils.py # 数据处理工具 │ └── models/ # 数据模型 │ └── models.py # 数据模型定义 ├── main.py # 主程序入口 ├── requirements.txt # 依赖文件 ├── config.json # 配置文件 └── README.md # 项目说明 ``` ## API 接口文档 ### REST API 端点 #### 1. IoT 数据接收 ``` POST /api/iot/data Content-Type: application/json { "device_id": "device_001", "data_type": "LL", "value": 25.5, "timestamp": "2024-01-01T12:00:00", "location": "A区" } ``` #### 2. 手动数据录入 ``` POST /api/manual/data Content-Type: application/json { "source": "manual", "data_type": "YL", "value": 0.8, "timestamp": "2024-01-01T12:00:00", "operator": "张三", "notes": "定期数据录入" } ``` #### 3. 批量导入 ``` POST /api/batch/import Content-Type: application/json { "batch_id": "batch_001", "data_source": "system_import", "records": [ { "device_id": "device_002", "data_type": "SW", "value": 5.2, "timestamp": "2024-01-01T12:00:00", "location": "B区" } ] } ``` #### 4. 数据查询 ``` GET /api/data/{data_type}?limit=100&offset=0 GET /api/data/recent?hours=24&limit=100 GET /api/stats ``` ### WebSocket 连接 #### 连接端点 ``` ws://localhost:8765 ``` #### 消息格式 **发送消息**: ```json { "type": "subscribe", "subscription": "LL" // 订阅特定类型数据,"all"订阅所有 } ``` **接收消息**: ```json { "type": "sensor_data", "data_type": "LL", "device_id": "device_001", "value": 25.5, "location": "A区", "timestamp": "2024-01-01T12:00:00Z" } ``` ## 安装和运行 ### 1. 安装依赖 ```bash pip install -r requirements.txt ``` ### 2. 启动系统 ```bash # 普通模式 python main.py # 演示模式(自动生成数据) python main.py --demo # 指定配置文件 python main.py --config custom_config.json ``` ### 3. 初始化项目 ```bash python main.py --init ``` ## 配置文件 创建 `config.json` 文件: ```json { "api": { "host": "0.0.0.0", "port": 8000 }, "websocket": { "host": "0.0.0.0", "port": 8765 }, "batch": { "max_file_size_mb": 100, "supported_formats": ["csv", "excel", "json"] }, "demo_mode": true, "logging": { "level": "INFO", "file": "water_management.log" } } ``` ## 数据类型说明 支持的水利行业标准数据类型: | 数据类型 | 描述 | 单位 | |---------|------|------| | LL | 流量 | m³/h | | YL | 压力 | MPa | | SW | 水位 | m | | ZD | 浊度 | NTU | | PH | pH值 | - | | WD | 温度 | °C | | DD | 电导率 | μS/cm | | YD | 硬度 | mg/L | ## 开发规范 ### 代码结构 - 遵循模块化设计,功能解耦 - 使用异步编程提高性能 - 统一的错误处理机制 - 完整的日志记录 ### 数据处理 - 数据验证和清洗 - 标准化字段映射 - 单位自动转换 - 质量评分机制 ### 安全考虑 - 输入数据验证 - 文件大小限制 - 连接状态监控 - 错误信息脱敏 ## 测试和验证 ### 数据验证 ```python from src.utils.data_utils import data_converter # 验证传感器数据 validation_result = data_converter.validate_sensor_data({ "device_id": "device_001", "data_type": "LL", "value": 25.5, "location": "A区" }) print(validation_result) ``` ### 批量导入测试 ```python import asyncio from src.batch.batch_import import batch_manager async def test_import(): result = await batch_manager.import_file( file_path="data.csv", batch_id="test_batch", data_source="test" ) print(result) asyncio.run(test_import()) ``` ## 部署建议 ### 1. 生产环境配置 - 使用反向代理(Nginx) - 配置SSL证书 - 设置防火墙规则 - 监控系统资源使用 ### 2. 性能优化 - 数据库连接池 - 缓存机制 - 异步处理优化 - 连接数限制 ### 3. 监控和日志 - 应用性能监控 - 错误日志收集 - 性能指标统计 - 告警机制 ## 许可证 本项目遵循 MIT 许可证。 ## 贡献指南 欢迎提交 Issue 和 Pull Request来贡献代码。 ## 联系方式 如有问题,请通过以下方式联系: - 邮箱:bot_dev1@xayunmei.com - 项目地址:http://git.xayunmei.com/bot_ym/water-management-system