# 数据引擎模块 (wm-data-engine) ## 概述 数据引擎是供水管理系统的核心模块,负责实时数据采集、处理、存储和监控。 ## 主要功能 ### 1. 实时数据采集 - **Kafka 消费者**: 消费 IoT 设备遥测数据,支持多 topic 分发 - **MQTT 客户端**: 支持物联网设备遥测数据和控制命令的双向通信 - **WebSocket 推送**: 实时推送数据到前端界面 ### 2. 数据处理 - **数据验证**: 完整的数据质量检查机制,包括设备编号、数值范围验证 - **数据路由**: 根据数据源类型自动路由到不同的处理通道 - **数据转换**: 支持多种数据格式的转换和标准化 ### 3. 数据存储 - **TDengine 时序数据库**: 存储物联网遥测数据 - **PostgreSQL 关系数据库**: 存储配置信息和统计数据 - **MinIO 对象存储**: 存储文件和报表数据 ### 4. 监控和统计 - **数据统计**: 采集量、成功率、错误率等统计分析 - **设备监控**: 设备数据状态、趋势分析 - **错误监控**: 错误分布、常见错误类型统计 ## 技术架构 ``` ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ IoT 设备 │ │ Kafka Topic │ │ MQTT Broker │ │ (流量计/压力计) │───▶│ iot.raw.generic │ │ tcp://1883 │ │ (水质传感器) │ │ data.quality │ │ │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ │ │ ▼ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ DataCollectService │ │ MqttService │ │ DataValidationUtils │ └─────────────────┘ └─────────────────┘ └─────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────────────┐ │ Data Engine Core │ │ 数据采集与处理 │ └─────────────────────────────────────────────────────────────┘ │ ▼ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ │ TDengine │ │ PostgreSQL │ │ MinIO │ │ (时序数据) │ │ (配置信息) │ │ (文件存储) │ └─────────────────┘ └─────────────────┘ └─────────────────┘ ``` ## 核心组件 ### DataCollectService - **功能**: 数据采集服务,支持实时流和批量采集 - **主要方法**: - `ingestRealtime()`: 实时数据接入 - `consumeIotRaw()`: Kafka 消费 IoT 原始数据 - `consumeQualityData()`: Kafka 消费水质数据 - `batchIngest()`: 批量数据采集 - `validateData()`: 数据验证 ### MqttService - **功能**: MQTT 消息服务,支持双向通信 - **主要方法**: - `handleIotTelemetry()`: 处理 IoT 遥测数据 - `handleIotCommand()`: 处理控制命令 - `handleQualityData()`: 处理水质数据 ### MqttPublishService - **功能**: MQTT 消息发布服务 - **主要方法**: - `sendDeviceCommand()`: 发送设备控制命令 - `sendDeviceConfig()`: 发送设备配置更新 - `batchSendConfig()`: 批量发送配置 ### DataStatisticsService - **功能**: 数据统计分析服务 - **主要方法**: - `getDataStatistics()`: 获取数据采集统计 - `getDeviceStatistics()`: 获取设备数据统计 - `getErrorStatistics()`: 获取错误统计 ### DataValidationUtils - **功能**: 数据验证工具类 - **验证规则**: - 设备编号格式验证 - 数值范围验证 - 数据完整性检查 - 水质数据专项验证 ## 配置说明 ### Kafka 配置 ```yaml spring: kafka: bootstrap-servers: ${KAFKA_SERVERS:127.0.0.1}:9092 consumer: group-id: wm-data-engine auto-offset-reset: latest producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer ``` ### MQTT 配置 ```yaml mqtt: broker-url: ${MQTT_BROKER_URL:tcp://127.0.0.1:1883} client-id: ${MQTT_CLIENT_ID:water-data-engine} username: ${MQTT_USERNAME:water} password: ${MQTT_PASSWORD:water123} topic: iot-telemetry: iot/telemetry/+ iot-command: iot/command/+ quality-data: quality/data/+ ``` ### TDengine 配置 ```yaml tda: host: ${TDENGINE_HOST:127.0.0.1} port: ${TDENGINE_PORT:6030} username: ${TDENGINE_USER:root} password: ${TDENGINE_PASS:taosdata} database: ${TDENGINE_DB:water_iot} ``` ## 数据格式 ### IoT 遥测数据格式 ```json { "deviceSn": "FM001", "timestamp": 1625097600000, "metrics": [ { "key": "LL", "value": 12.5, "unit": "立方米/小时" }, { "key": "YL", "value": 0.35, "unit": "MPa" } ] } ``` ### 水质数据格式 ```json { "testType": "常规检测", "testPoint": "水厂出口", "pointType": "出厂水", "area": "主城区", "turbidity": 0.5, "ph": 7.2, "residualChlorine": 0.3, "isQualified": true } ``` ## API 接口 ### 数据采集管理 - `POST /api/data/collect/realtime` - 实时数据接入 - `POST /api/data/collect/batch` - 批量数据采集 - `GET /api/data/tasks` - 查询采集任务列表 - `GET /api/data/records` - 查询采集记录 ### 统计分析 - `GET /api/data/statistics` - 数据统计 - `GET /api/data/devices/{deviceSn}/statistics` - 设备统计 - `GET /api/data/errors/statistics` - 错误统计 ### MQTT 控制接口 - `POST /api/mqtt/control` - 发送设备控制命令 - `POST /api/mqtt/config` - 更新设备配置 ## WebSocket 主题 ### 实时数据推送 - `/topic/data/realtime` - 全量实时数据 - `/topic/data/realtime/iot` - IoT 设备数据 - `/topic/data/realtime/quality` - 水质数据 ### 控制指令 - `/topic/data/control` - 控制状态反馈 ### 告警信息 - `/topic/data/alert` - 数据告警推送 ### 统计数据 - `/topic/data/statistics` - 统计数据推送 ## 测试 ### 单元测试 - `DataCollectServiceTest` - 数据采集服务测试 - `KafkaConsumerTest` - Kafka 消费者测试 - `DataValidationUtilsTest` - 数据验证测试 ### 集成测试 - 实际 Kafka 服务器测试 - 实际 MQTT Broker 测试 - 数据库集成测试 ## 部署和使用 ### 环境要求 - Java 17+ - Spring Boot 3.3.5 - PostgreSQL 14+ - TDengine 3.0+ - Kafka 3.x+ - MQTT Broker (Eclipse Paho) ### 启动服务 ```bash mvn spring-boot:run ``` ### 监控指标 - 数据采集成功率 - 数据处理延迟 - 内存使用情况 - 线程池状态 ## 问题排查 ### 常见问题 1. **Kafka 连接失败**: 检查 Kafka 服务器地址和端口 2. **MQTT 连接失败**: 检查 Broker URL、用户名和密码 3. **TDengine 写入失败**: 检查数据库连接和表结构 4. **数据验证失败**: 检查数据格式和数值范围 ### 日志配置 ```yaml logging: level: com.water.data_engine: DEBUG org.springframework.kafka: INFO org.eclipse.paho.client.mqttv3: WARN ``` ## 开发指南 ### 添加新的数据源类型 1. 在 `MetricType` 枚举中添加新的指标类型 2. 在 `DataValidationUtils` 中添加对应的验证规则 3. 在 `DataCollectService` 中添加对应的处理逻辑 4. 更新配置文件中的 topic 路由规则 ### 扩展数据验证规则 1. 在 `DataValidationUtils` 中添加新的验证方法 2. 在 `validateData()` 方法中调用新的验证逻辑 3. 编写对应的单元测试 ### 添加新的数据存储后端 1. 实现新的存储接口 2. 在 `DataCollectService` 中集成新的存储后端 3. 添加配置选项 4. 编写集成测试