|
|
4 dagar sedan | |
|---|---|---|
| .. | ||
| src | 4 dagar sedan | |
| README.md | 4 dagar sedan | |
| pom.xml | 4 dagar sedan | |
数据引擎是供水管理系统的核心模块,负责实时数据采集、处理、存储和监控。
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ IoT 设备 │ │ Kafka Topic │ │ MQTT Broker │
│ (流量计/压力计) │───▶│ iot.raw.generic │ │ tcp://1883 │
│ (水质传感器) │ │ data.quality │ │ │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ DataCollectService │ │ MqttService │ │ DataValidationUtils │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Data Engine Core │
│ 数据采集与处理 │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ TDengine │ │ PostgreSQL │ │ MinIO │
│ (时序数据) │ │ (配置信息) │ │ (文件存储) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
ingestRealtime(): 实时数据接入consumeIotRaw(): Kafka 消费 IoT 原始数据consumeQualityData(): Kafka 消费水质数据batchIngest(): 批量数据采集validateData(): 数据验证handleIotTelemetry(): 处理 IoT 遥测数据handleIotCommand(): 处理控制命令handleQualityData(): 处理水质数据sendDeviceCommand(): 发送设备控制命令sendDeviceConfig(): 发送设备配置更新batchSendConfig(): 批量发送配置getDataStatistics(): 获取数据采集统计getDeviceStatistics(): 获取设备数据统计getErrorStatistics(): 获取错误统计spring:
kafka:
bootstrap-servers: ${KAFKA_SERVERS:127.0.0.1}:9092
consumer:
group-id: wm-data-engine
auto-offset-reset: latest
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
mqtt:
broker-url: ${MQTT_BROKER_URL:tcp://127.0.0.1:1883}
client-id: ${MQTT_CLIENT_ID:water-data-engine}
username: ${MQTT_USERNAME:water}
password: ${MQTT_PASSWORD:water123}
topic:
iot-telemetry: iot/telemetry/+
iot-command: iot/command/+
quality-data: quality/data/+
tda:
host: ${TDENGINE_HOST:127.0.0.1}
port: ${TDENGINE_PORT:6030}
username: ${TDENGINE_USER:root}
password: ${TDENGINE_PASS:taosdata}
database: ${TDENGINE_DB:water_iot}
{
"deviceSn": "FM001",
"timestamp": 1625097600000,
"metrics": [
{
"key": "LL",
"value": 12.5,
"unit": "立方米/小时"
},
{
"key": "YL",
"value": 0.35,
"unit": "MPa"
}
]
}
{
"testType": "常规检测",
"testPoint": "水厂出口",
"pointType": "出厂水",
"area": "主城区",
"turbidity": 0.5,
"ph": 7.2,
"residualChlorine": 0.3,
"isQualified": true
}
POST /api/data/collect/realtime - 实时数据接入POST /api/data/collect/batch - 批量数据采集GET /api/data/tasks - 查询采集任务列表GET /api/data/records - 查询采集记录GET /api/data/statistics - 数据统计GET /api/data/devices/{deviceSn}/statistics - 设备统计GET /api/data/errors/statistics - 错误统计POST /api/mqtt/control - 发送设备控制命令POST /api/mqtt/config - 更新设备配置/topic/data/realtime - 全量实时数据/topic/data/realtime/iot - IoT 设备数据/topic/data/realtime/quality - 水质数据/topic/data/control - 控制状态反馈/topic/data/alert - 数据告警推送/topic/data/statistics - 统计数据推送DataCollectServiceTest - 数据采集服务测试KafkaConsumerTest - Kafka 消费者测试DataValidationUtilsTest - 数据验证测试mvn spring-boot:run
logging:
level:
com.water.data_engine: DEBUG
org.springframework.kafka: INFO
org.eclipse.paho.client.mqttv3: WARN
MetricType 枚举中添加新的指标类型DataValidationUtils 中添加对应的验证规则DataCollectService 中添加对应的处理逻辑DataValidationUtils 中添加新的验证方法validateData() 方法中调用新的验证逻辑DataCollectService 中集成新的存储后端