智慧水务管理系统 - 精河县供水工程综合管理平台
bot_dev1 60166c7d80 fix: 修复PM退回问题 - Issue #88巡查APP 3 dagar sedan
frontend feat: 实现自助BI看板功能,支持Superset/Metabase集成 4 dagar sedan
sql feat(wm-patrol): #88 巡查APP(任务/工单/统计/问题上报/个人中心) 4 dagar sedan
src 实现IoT模块 - 完成Issue #28: MQTT协议适配器+设备注册/发现API 4 dagar sedan
wm-common fix: 修复PM退回问题 - Issue #88巡查APP 3 dagar sedan
wm-patrol fix: 修复PM退回问题 - Issue #88巡查APP 3 dagar sedan
README.md 实现数据接入层功能(REST API + WebSocket + 批量导入) 4 dagar sedan
main.py 实现IoT模块 - 完成Issue #28: MQTT协议适配器+设备注册/发现API 4 dagar sedan
pom.xml fix: 修复PM退回问题 - Issue #88巡查APP 3 dagar sedan
requirements.txt 实现数据接入层功能(REST API + WebSocket + 批量导入) 4 dagar sedan
test_iot.py 实现IoT模块 - 完成Issue #28: MQTT协议适配器+设备注册/发现API 4 dagar sedan
test_iot_simple.py 实现IoT模块 - 完成Issue #28: MQTT协议适配器+设备注册/发现API 4 dagar sedan

README.md

水务管理系统 - 数据接入层

项目概述

本项目是水务管理系统中的数据接入层,实现了多源数据接入、实时WebSocket推送和批量数据导入功能。

功能特性

1. REST API 数据接入

  • IoT设备数据接入:支持实时接收传感器数据
  • 手动数据录入:支持人工录入数据
  • 批量API导入:支持通过API批量导入数据
  • 数据查询接口:提供按数据类型、时间范围等条件的数据查询
  • 统计分析接口:提供数据统计和分析功能

2. WebSocket 实时推送

  • 实时数据推送:传感器数据实时推送到客户端
  • 连接管理:支持多客户端连接和订阅管理
  • 数据历史:新连接客户端可以获取历史数据
  • 警报推送:支持实时警报推送
  • 心跳检测:支持连接状态监控

3. 批量数据导入

  • 多格式支持:支持CSV、Excel、JSON格式导入
  • 数据验证:内置数据验证和错误处理
  • 字段映射:支持水利行业标准字段映射
  • 单位转换:自动进行单位转换和标准化
  • 批量处理:支持大批量数据处理

技术栈

  • 后端框架:FastAPI
  • WebSocket:websockets
  • 数据处理:pandas, numpy
  • 异步处理:asyncio
  • 数据验证:pydantic
  • 文件处理:aiofiles

项目结构

water-management-system/
├── src/
│   ├── api/                 # REST API模块
│   │   └── rest_api.py      # 主API服务器
│   ├── websocket/           # WebSocket模块
│   │   └── websocket_server.py  # WebSocket服务器
│   ├── batch/               # 批量导入模块
│   │   └── batch_import.py  # 批量导入功能
│   ├── utils/               # 工具模块
│   │   └── data_utils.py    # 数据处理工具
│   └── models/              # 数据模型
│       └── models.py        # 数据模型定义
├── main.py                  # 主程序入口
├── requirements.txt        # 依赖文件
├── config.json             # 配置文件
└── README.md               # 项目说明

API 接口文档

REST API 端点

1. IoT 数据接收

POST /api/iot/data
Content-Type: application/json

{
    "device_id": "device_001",
    "data_type": "LL",
    "value": 25.5,
    "timestamp": "2024-01-01T12:00:00",
    "location": "A区"
}

2. 手动数据录入

POST /api/manual/data
Content-Type: application/json

{
    "source": "manual",
    "data_type": "YL",
    "value": 0.8,
    "timestamp": "2024-01-01T12:00:00",
    "operator": "张三",
    "notes": "定期数据录入"
}

3. 批量导入

POST /api/batch/import
Content-Type: application/json

{
    "batch_id": "batch_001",
    "data_source": "system_import",
    "records": [
        {
            "device_id": "device_002",
            "data_type": "SW",
            "value": 5.2,
            "timestamp": "2024-01-01T12:00:00",
            "location": "B区"
        }
    ]
}

4. 数据查询

GET /api/data/{data_type}?limit=100&offset=0
GET /api/data/recent?hours=24&limit=100
GET /api/stats

WebSocket 连接

连接端点

ws://localhost:8765

消息格式

发送消息

{
    "type": "subscribe",
    "subscription": "LL"  // 订阅特定类型数据,"all"订阅所有
}

接收消息

{
    "type": "sensor_data",
    "data_type": "LL",
    "device_id": "device_001",
    "value": 25.5,
    "location": "A区",
    "timestamp": "2024-01-01T12:00:00Z"
}

安装和运行

1. 安装依赖

pip install -r requirements.txt

2. 启动系统

# 普通模式
python main.py

# 演示模式(自动生成数据)
python main.py --demo

# 指定配置文件
python main.py --config custom_config.json

3. 初始化项目

python main.py --init

配置文件

创建 config.json 文件:

{
    "api": {
        "host": "0.0.0.0",
        "port": 8000
    },
    "websocket": {
        "host": "0.0.0.0",
        "port": 8765
    },
    "batch": {
        "max_file_size_mb": 100,
        "supported_formats": ["csv", "excel", "json"]
    },
    "demo_mode": true,
    "logging": {
        "level": "INFO",
        "file": "water_management.log"
    }
}

数据类型说明

支持的水利行业标准数据类型:

数据类型 描述 单位
LL 流量 m³/h
YL 压力 MPa
SW 水位 m
ZD 浊度 NTU
PH pH值 -
WD 温度 °C
DD 电导率 μS/cm
YD 硬度 mg/L

开发规范

代码结构

  • 遵循模块化设计,功能解耦
  • 使用异步编程提高性能
  • 统一的错误处理机制
  • 完整的日志记录

数据处理

  • 数据验证和清洗
  • 标准化字段映射
  • 单位自动转换
  • 质量评分机制

安全考虑

  • 输入数据验证
  • 文件大小限制
  • 连接状态监控
  • 错误信息脱敏

测试和验证

数据验证

from src.utils.data_utils import data_converter

# 验证传感器数据
validation_result = data_converter.validate_sensor_data({
    "device_id": "device_001",
    "data_type": "LL",
    "value": 25.5,
    "location": "A区"
})

print(validation_result)

批量导入测试

import asyncio
from src.batch.batch_import import batch_manager

async def test_import():
    result = await batch_manager.import_file(
        file_path="data.csv",
        batch_id="test_batch",
        data_source="test"
    )
    print(result)

asyncio.run(test_import())

部署建议

1. 生产环境配置

  • 使用反向代理(Nginx)
  • 配置SSL证书
  • 设置防火墙规则
  • 监控系统资源使用

2. 性能优化

  • 数据库连接池
  • 缓存机制
  • 异步处理优化
  • 连接数限制

3. 监控和日志

  • 应用性能监控
  • 错误日志收集
  • 性能指标统计
  • 告警机制

许可证

本项目遵循 MIT 许可证。

贡献指南

欢迎提交 Issue 和 Pull Request来贡献代码。

联系方式

如有问题,请通过以下方式联系:

  • 邮箱:bot_dev1@xayunmei.com
  • 项目地址:http://git.xayunmei.com/bot_ym/water-management-system