智慧水务管理系统 - 精河县供水工程综合管理平台
bot_dev1 f6a7b11cba feat(测试): Issue #92 - 实现后端单元测试和集成测试 3 dni temu
..
src feat(测试): Issue #92 - 实现后端单元测试和集成测试 3 dni temu
README.md feat(wm-data-engine): 完善 Issue #41 - 实时流数据采集功能 4 dni temu
pom.xml feat: 实现 Issue #41 - 实时流数据采集(MQTT/Kafka Consumer) 5 dni temu

README.md

数据引擎模块 (wm-data-engine)

概述

数据引擎是供水管理系统的核心模块,负责实时数据采集、处理、存储和监控。

主要功能

1. 实时数据采集

  • Kafka 消费者: 消费 IoT 设备遥测数据,支持多 topic 分发
  • MQTT 客户端: 支持物联网设备遥测数据和控制命令的双向通信
  • WebSocket 推送: 实时推送数据到前端界面

2. 数据处理

  • 数据验证: 完整的数据质量检查机制,包括设备编号、数值范围验证
  • 数据路由: 根据数据源类型自动路由到不同的处理通道
  • 数据转换: 支持多种数据格式的转换和标准化

3. 数据存储

  • TDengine 时序数据库: 存储物联网遥测数据
  • PostgreSQL 关系数据库: 存储配置信息和统计数据
  • MinIO 对象存储: 存储文件和报表数据

4. 监控和统计

  • 数据统计: 采集量、成功率、错误率等统计分析
  • 设备监控: 设备数据状态、趋势分析
  • 错误监控: 错误分布、常见错误类型统计

技术架构

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   IoT 设备      │    │   Kafka Topic   │    │   MQTT Broker   │
│  (流量计/压力计) │───▶│  iot.raw.generic │    │   tcp://1883    │
│  (水质传感器)   │    │   data.quality   │    │                 │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         │                       │                       │
         ▼                       ▼                       ▼
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│ DataCollectService │    │  MqttService    │    │ DataValidationUtils │
└─────────────────┘    └─────────────────┘    └─────────────────┘
         │
         ▼
┌─────────────────────────────────────────────────────────────┐
│                    Data Engine Core                         │
│                   数据采集与处理                            │
└─────────────────────────────────────────────────────────────┘
         │
         ▼
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│  TDengine       │    │  PostgreSQL     │    │   MinIO         │
│  (时序数据)     │    │  (配置信息)     │    │  (文件存储)     │
└─────────────────┘    └─────────────────┘    └─────────────────┘

核心组件

DataCollectService

  • 功能: 数据采集服务,支持实时流和批量采集
  • 主要方法:
    • ingestRealtime(): 实时数据接入
    • consumeIotRaw(): Kafka 消费 IoT 原始数据
    • consumeQualityData(): Kafka 消费水质数据
    • batchIngest(): 批量数据采集
    • validateData(): 数据验证

MqttService

  • 功能: MQTT 消息服务,支持双向通信
  • 主要方法:
    • handleIotTelemetry(): 处理 IoT 遥测数据
    • handleIotCommand(): 处理控制命令
    • handleQualityData(): 处理水质数据

MqttPublishService

  • 功能: MQTT 消息发布服务
  • 主要方法:
    • sendDeviceCommand(): 发送设备控制命令
    • sendDeviceConfig(): 发送设备配置更新
    • batchSendConfig(): 批量发送配置

DataStatisticsService

  • 功能: 数据统计分析服务
  • 主要方法:
    • getDataStatistics(): 获取数据采集统计
    • getDeviceStatistics(): 获取设备数据统计
    • getErrorStatistics(): 获取错误统计

DataValidationUtils

  • 功能: 数据验证工具类
  • 验证规则:
    • 设备编号格式验证
    • 数值范围验证
    • 数据完整性检查
    • 水质数据专项验证

配置说明

Kafka 配置

spring:
  kafka:
    bootstrap-servers: ${KAFKA_SERVERS:127.0.0.1}:9092
    consumer:
      group-id: wm-data-engine
      auto-offset-reset: latest
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

MQTT 配置

mqtt:
  broker-url: ${MQTT_BROKER_URL:tcp://127.0.0.1:1883}
  client-id: ${MQTT_CLIENT_ID:water-data-engine}
  username: ${MQTT_USERNAME:water}
  password: ${MQTT_PASSWORD:water123}
  topic:
    iot-telemetry: iot/telemetry/+
    iot-command: iot/command/+
    quality-data: quality/data/+

TDengine 配置

tda:
  host: ${TDENGINE_HOST:127.0.0.1}
  port: ${TDENGINE_PORT:6030}
  username: ${TDENGINE_USER:root}
  password: ${TDENGINE_PASS:taosdata}
  database: ${TDENGINE_DB:water_iot}

数据格式

IoT 遥测数据格式

{
  "deviceSn": "FM001",
  "timestamp": 1625097600000,
  "metrics": [
    {
      "key": "LL",
      "value": 12.5,
      "unit": "立方米/小时"
    },
    {
      "key": "YL", 
      "value": 0.35,
      "unit": "MPa"
    }
  ]
}

水质数据格式

{
  "testType": "常规检测",
  "testPoint": "水厂出口",
  "pointType": "出厂水",
  "area": "主城区",
  "turbidity": 0.5,
  "ph": 7.2,
  "residualChlorine": 0.3,
  "isQualified": true
}

API 接口

数据采集管理

  • POST /api/data/collect/realtime - 实时数据接入
  • POST /api/data/collect/batch - 批量数据采集
  • GET /api/data/tasks - 查询采集任务列表
  • GET /api/data/records - 查询采集记录

统计分析

  • GET /api/data/statistics - 数据统计
  • GET /api/data/devices/{deviceSn}/statistics - 设备统计
  • GET /api/data/errors/statistics - 错误统计

MQTT 控制接口

  • POST /api/mqtt/control - 发送设备控制命令
  • POST /api/mqtt/config - 更新设备配置

WebSocket 主题

实时数据推送

  • /topic/data/realtime - 全量实时数据
  • /topic/data/realtime/iot - IoT 设备数据
  • /topic/data/realtime/quality - 水质数据

控制指令

  • /topic/data/control - 控制状态反馈

告警信息

  • /topic/data/alert - 数据告警推送

统计数据

  • /topic/data/statistics - 统计数据推送

测试

单元测试

  • DataCollectServiceTest - 数据采集服务测试
  • KafkaConsumerTest - Kafka 消费者测试
  • DataValidationUtilsTest - 数据验证测试

集成测试

  • 实际 Kafka 服务器测试
  • 实际 MQTT Broker 测试
  • 数据库集成测试

部署和使用

环境要求

  • Java 17+
  • Spring Boot 3.3.5
  • PostgreSQL 14+
  • TDengine 3.0+
  • Kafka 3.x+
  • MQTT Broker (Eclipse Paho)

启动服务

mvn spring-boot:run

监控指标

  • 数据采集成功率
  • 数据处理延迟
  • 内存使用情况
  • 线程池状态

问题排查

常见问题

  1. Kafka 连接失败: 检查 Kafka 服务器地址和端口
  2. MQTT 连接失败: 检查 Broker URL、用户名和密码
  3. TDengine 写入失败: 检查数据库连接和表结构
  4. 数据验证失败: 检查数据格式和数值范围

日志配置

logging:
  level:
    com.water.data_engine: DEBUG
    org.springframework.kafka: INFO
    org.eclipse.paho.client.mqttv3: WARN

开发指南

添加新的数据源类型

  1. MetricType 枚举中添加新的指标类型
  2. DataValidationUtils 中添加对应的验证规则
  3. DataCollectService 中添加对应的处理逻辑
  4. 更新配置文件中的 topic 路由规则

扩展数据验证规则

  1. DataValidationUtils 中添加新的验证方法
  2. validateData() 方法中调用新的验证逻辑
  3. 编写对应的单元测试

添加新的数据存储后端

  1. 实现新的存储接口
  2. DataCollectService 中集成新的存储后端
  3. 添加配置选项
  4. 编写集成测试