# 数据汇聚引擎 (wm-data-engine) ## 模块概述 数据汇聚引擎是智慧水务管理系统的核心数据处理模块,负责实时采集、验证、存储和推送来自多个来源的数据。 ## 核心功能 ### 🔄 实时流数据采集 #### MQTT 支持 - **协议**: MQTT 3.1/3.1.1 - **客户端**: Eclipse Paho - **主题监听**: - `iot/telemetry/+` - 设备遥测数据 - `iot/command/+` - 设备控制命令 - `quality/data/+` - 水质检测数据 - **数据格式**: JSON #### Kafka 消费者 - **IoT原始数据**: `iot.raw.generic` - 处理设备遥测数据 - **水质数据**: `data.quality` - 处理水质检测数据 - **手动录入**: `data.manual` - 处理人工录入数据 - **API接口**: `data.api` - 处理接口调用数据 ### 📊 数据验证 #### 验证规则 - **设备编号**: 6-20位字母数字 - **数值范围**: 根据指标类型设定合理范围 - **数据完整性**: 必需字段检查 - **质量评分**: 数据质量量化评估 #### 支持的指标类型 - **水表指标**: 流量、压力、温度、水位、累计用水量 - **水质指标**: 浊度、pH值、余氯、总氯、总硬度 - **管道指标**: 管道压力、流量、温度、泄漏状态 - **阀门指标**: 开度、状态、压差 - **水泵指标**: 状态、流量、电流、功率、温度 - **环境指标**: 温度、湿度、气压 ### 💾 数据存储 #### TDengine 时序数据库 - **存储设备遥测数据** - **超级表设计**: water_iot.iot_telemetry - **高压缩率**: 适用于大量时间序列数据 - **快速查询**: 支持降采样和聚合分析 #### PostgreSQL 关系数据库 - **存储水质检测记录** - **存储配置和元数据** - **支持复杂查询和事务处理 ### 📈 数据统计 #### 统计功能 - **采集任务统计**: 成功率、失败率、处理时间 - **数据质量统计**: 合格率、异常分布 - **设备状态统计**: 在线率、故障率 - **实时监控**: WebSocket 推送 ### 🔌 接口说明 #### REST API - `GET /api/data/collect/tasks` - 查询采集任务列表 - `POST /api/data/collect/tasks` - 创建采集任务 - `GET /api/data/collect/records` - 查询采集记录 - `POST /api/data/collect/batch` - 批量数据采集 #### WebSocket - `/topic/data/realtime/{sourceType}` - 实时数据推送 ## 配置说明 ### MQTT 配置 ```yaml mqtt: broker-url: tcp://127.0.0.1:1883 client-id: water-data-engine username: water password: water123 timeout: 30 keep-alive: 60 topic: iot-telemetry: iot/telemetry/+ iot-command: iot/command/+ quality-data: quality/data/+ ``` ### Kafka 配置 ```yaml spring: kafka: bootstrap-servers: 127.0.0.1:9092 consumer: group-id: wm-data-engine auto-offset-reset: latest producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer ``` ### TDengine 配置 ```yaml tda: host: 127.0.0.1 port: 6030 username: root password: taosdata database: water_iot ``` ## 数据格式 ### IoT 遥测数据 ```json { "deviceSn": "FM001", "timestamp": 1718352000000, "metrics": [ { "key": "LL", "value": 12.5 }, { "key": "YL", "value": 0.35 } ] } ``` ### 水质数据 ```json { "testType": "常规检测", "testPoint": "水厂出口", "pointType": "出厂水", "area": "主城区", "turbidity": 0.5, "ph": 7.2, "residualChlorine": 0.3, "isQualified": true } ``` ## 监控和日志 ### 日志级别 - **DEBUG**: 详细的数据处理日志 - **INFO**: 关键操作和状态变更 - **WARN**: 异常但可恢复的情况 - **ERROR**: 严重错误 ### 监控指标 - **采集成功率**: 成功处理的数据量 / 总数据量 - **数据处理延迟**: 从接收到存储的时间差 - **系统负载**: CPU、内存使用率 - **连接状态**: MQTT、Kafka、数据库连接数 ## 开发指南 ### 添加新的数据源 1. 在 `DataCollectService` 中添加新的处理方法 2. 在 `DataValidationUtils` 中添加验证规则 3. 更新 `MetricType` 枚举(如需要) 4. 添加对应的单元测试 ### 添加新的数据类型 1. 定义新的消息格式 2. 更新 Kafka 消费者 3. 添加数据验证逻辑 4. 实现存储逻辑 ## 测试 ### 运行单元测试 ```bash mvn test ``` ### 运行集成测试 ```bash mvn verify ``` ## 故障排查 ### 常见问题 1. **MQTT 连接失败**: 检查 Broker 地址和认证信息 2. **Kafka 消费延迟**: 检查消费者组和 Topic 配置 3. **TDengine 写入失败**: 检查数据库连接和超级表结构 4. **数据验证失败**: 检查数据格式和范围规则 ### 调试模式 设置日志级别为 DEBUG: ```yaml logging: level: com.water.data_engine: DEBUG ``` ## 版本历史 ### v1.0.0 (2026-06-15) - 实现 Issue #41: 实时流数据采集(MQTT/Kafka Consumer) - 支持 MQTT 客户端和数据接收 - 实现 Kafka 消费者功能 - 添加数据验证和质量检查 - 集成 TDengine 时序数据库 - 完善测试覆盖