Просмотр исходного кода

Phase 2 #4 #5: 数据引擎 — 汇聚/治理/服务全管道

#4 数据汇聚:
- DataCollectService: 多源数据接入入口(iot/manual/api) → Kafka路由
- Kafka Consumer: iot.raw.generic → 解析指标 → 写入TDengine时序库
- 批量接入 API (batchIngest)

#5 数据治理:
- standardize(): 水利数据对象标准字段映射(LL流量/YL压力/SW水位/ZD浊度等)
- clean(): 缺失值填充(-9999标记)/异常值检测(负值标记)
- qualityCheck(): 数据质控打分(完整性-10/异常-20)
- buildLineage(): 数据血缘关联记录
- DataController: /ingest 接入 /pipeline 标准化管道演示
bot_pm 2 недель назад
Родитель
Сommit
919f75cf9e

+ 48
- 0
wm-data-engine/src/main/java/com/water/data_engine/controller/DataController.java Просмотреть файл

@@ -0,0 +1,48 @@
1
+package com.water.data_engine.controller;
2
+
3
+import com.water.common.core.result.R;
4
+import com.water.data_engine.service.DataCollectService;
5
+import com.water.data_engine.service.DataGovernanceService;
6
+import io.swagger.v3.oas.annotations.Operation;
7
+import io.swagger.v3.oas.annotations.tags.Tag;
8
+import lombok.RequiredArgsConstructor;
9
+import org.springframework.web.bind.annotation.*;
10
+
11
+import java.util.*;
12
+
13
+@Tag(name = "数据引擎")
14
+@RestController
15
+@RequestMapping("/data")
16
+@RequiredArgsConstructor
17
+public class DataController {
18
+
19
+    private final DataCollectService collectService;
20
+    private final DataGovernanceService governanceService;
21
+
22
+    @Operation(summary = "数据接入")
23
+    @PostMapping("/ingest")
24
+    public R<String> ingest(@RequestBody Map<String, Object> req) {
25
+        String sourceType = (String) req.get("sourceType");
26
+        String sourceId = (String) req.get("sourceId");
27
+        @SuppressWarnings("unchecked")
28
+        Map<String, Object> data = (Map<String, Object>) req.get("data");
29
+        collectService.ingest(sourceType, sourceId, data);
30
+        return R.ok("数据已接入");
31
+    }
32
+
33
+    @Operation(summary = "批量接入")
34
+    @PostMapping("/ingest/batch")
35
+    public R<String> batchIngest(@RequestBody List<Map<String, Object>> batch) {
36
+        collectService.batchIngest(batch);
37
+        return R.ok("批量接入完成");
38
+    }
39
+
40
+    @Operation(summary = "数据标准化+清洗+质控(管道演示)")
41
+    @PostMapping("/pipeline")
42
+    public R<Map<String, Object>> pipeline(@RequestBody Map<String, Object> raw) {
43
+        Map<String, Object> std = governanceService.standardize(raw);
44
+        Map<String, Object> cleaned = governanceService.clean(std);
45
+        Map<String, Object> result = governanceService.qualityCheck(cleaned);
46
+        return R.ok(result);
47
+    }
48
+}

+ 82
- 0
wm-data-engine/src/main/java/com/water/data_engine/service/DataCollectService.java Просмотреть файл

@@ -0,0 +1,82 @@
1
+package com.water.data_engine.service;
2
+
3
+import com.fasterxml.jackson.databind.ObjectMapper;
4
+import lombok.RequiredArgsConstructor;
5
+import lombok.extern.slf4j.Slf4j;
6
+import org.springframework.jdbc.core.JdbcTemplate;
7
+import org.springframework.kafka.annotation.KafkaListener;
8
+import org.springframework.kafka.core.KafkaTemplate;
9
+import org.springframework.stereotype.Service;
10
+
11
+import java.time.Instant;
12
+import java.util.*;
13
+
14
+@Slf4j
15
+@Service
16
+@RequiredArgsConstructor
17
+public class DataCollectService {
18
+
19
+    private final KafkaTemplate<String, String> kafkaTemplate;
20
+    private final JdbcTemplate jdbcTemplate;
21
+    private final ObjectMapper mapper = new ObjectMapper();
22
+
23
+    /** 数据汇聚入口:接收各来源数据,统一写入 Kafka */
24
+    public void ingest(String sourceType, String sourceId, Map<String, Object> rawData) {
25
+        try {
26
+            Map<String, Object> envelope = new LinkedHashMap<>();
27
+            envelope.put("sourceType", sourceType);   // iot/manual/api
28
+            envelope.put("sourceId", sourceId);
29
+            envelope.put("timestamp", Instant.now().toEpochMilli());
30
+            envelope.put("data", rawData);
31
+            String json = mapper.writeValueAsString(envelope);
32
+
33
+            // 根据来源路由到不同 topic
34
+            String topic = switch (sourceType) {
35
+                case "iot" -> "iot.raw.generic";
36
+                case "manual" -> "data.manual";
37
+                case "api" -> "data.api";
38
+                default -> "data.raw";
39
+            };
40
+            kafkaTemplate.send(topic, sourceId, json);
41
+            log.debug("Ingested: {} -> {}", sourceType, sourceId);
42
+        } catch (Exception e) {
43
+            log.error("Ingest error: {}", e.getMessage());
44
+        }
45
+    }
46
+
47
+    /** Kafka 实时流消费:写入 TDengine 时序库 */
48
+    @KafkaListener(topics = "iot.raw.generic", groupId = "wm-data-engine")
49
+    public void consumeIotRaw(String message) {
50
+        try {
51
+            @SuppressWarnings("unchecked")
52
+            Map<String, Object> envelope = mapper.readValue(message, Map.class);
53
+            @SuppressWarnings("unchecked")
54
+            Map<String, Object> data = (Map<String, Object>) envelope.get("data");
55
+
56
+            String deviceSn = (String) data.getOrDefault("deviceSn", "unknown");
57
+            @SuppressWarnings("unchecked")
58
+            List<Map<String, Object>> metrics = (List<Map<String, Object>>) data.getOrDefault("metrics", List.of());
59
+
60
+            for (Map<String, Object> metric : metrics) {
61
+                String key = (String) metric.get("key");
62
+                Object value = metric.get("value");
63
+                // 写入 TDengine(简化:用标准 SQL)
64
+                String sql = "INSERT INTO water_iot.iot_telemetry (ts, device_sn, metric_key, metric_value, quality) VALUES (NOW, ?, ?, ?, 1)";
65
+                jdbcTemplate.update(sql, deviceSn, key, value);
66
+            }
67
+        } catch (Exception e) {
68
+            log.error("Consume error: {}", e.getMessage());
69
+        }
70
+    }
71
+
72
+    /** 批量数据采集 API */
73
+    public void batchIngest(List<Map<String, Object>> batchData) {
74
+        for (Map<String, Object> data : batchData) {
75
+            String sourceType = (String) data.getOrDefault("sourceType", "batch");
76
+            String sourceId = (String) data.getOrDefault("sourceId", UUID.randomUUID().toString());
77
+            @SuppressWarnings("unchecked")
78
+            Map<String, Object> rawData = (Map<String, Object>) data.getOrDefault("data", new HashMap<>());
79
+            ingest(sourceType, sourceId, rawData);
80
+        }
81
+    }
82
+}

+ 89
- 0
wm-data-engine/src/main/java/com/water/data_engine/service/DataGovernanceService.java Просмотреть файл

@@ -0,0 +1,89 @@
1
+package com.water.data_engine.service;
2
+
3
+import lombok.RequiredArgsConstructor;
4
+import lombok.extern.slf4j.Slf4j;
5
+import org.springframework.jdbc.core.JdbcTemplate;
6
+import org.springframework.stereotype.Service;
7
+
8
+import java.util.*;
9
+
10
+@Slf4j
11
+@Service
12
+@RequiredArgsConstructor
13
+public class DataGovernanceService {
14
+
15
+    private final JdbcTemplate jdbcTemplate;
16
+
17
+    /** 数据标准化:水利数据对象标准映射 */
18
+    public Map<String, Object> standardize(Map<String, Object> raw) {
19
+        Map<String, Object> std = new LinkedHashMap<>();
20
+        // 水利行业标准字段映射
21
+        Map<String, String> standardFields = Map.of(
22
+            "flow", "LL",        // 流量 → 水利标准 LL
23
+            "pressure", "YL",    // 压力 → 水利标准 YL
24
+            "level", "SW",       // 水位 → 水利标准 SW
25
+            "turbidity", "ZD",   // 浊度 → 水利标准 ZD
26
+            "ph", "PH",
27
+            "residual_chlorine", "YLJL",
28
+            "temperature", "WD"
29
+        );
30
+        for (Map.Entry<String, Object> entry : raw.entrySet()) {
31
+            String key = standardFields.getOrDefault(entry.getKey(), entry.getKey());
32
+            std.put(key, entry.getValue());
33
+        }
34
+        std.put("standardized", true);
35
+        return std;
36
+    }
37
+
38
+    /** 数据清洗:缺失值填充、异常值检测 */
39
+    public Map<String, Object> clean(Map<String, Object> data) {
40
+        Map<String, Object> cleaned = new LinkedHashMap<>(data);
41
+        // 缺失值填充:数值类用 -9999 标记
42
+        for (String numField : List.of("LL", "YL", "SW", "ZD", "PH", "YLJL", "WD")) {
43
+            Object v = cleaned.get(numField);
44
+            if (v == null || "".equals(v)) {
45
+                cleaned.put(numField, -9999.0);
46
+                cleaned.put(numField + "_flag", "MISSING");
47
+            }
48
+        }
49
+        // 异常值检测:负值标记
50
+        if (cleaned.containsKey("LL")) {
51
+            double ll = ((Number) cleaned.get("LL")).doubleValue();
52
+            if (ll < 0) cleaned.put("LL_flag", "ABNORMAL");
53
+        }
54
+        cleaned.put("cleaned", true);
55
+        return cleaned;
56
+    }
57
+
58
+    /** 数据质控:打分 */
59
+    public Map<String, Object> qualityCheck(Map<String, Object> data) {
60
+        Map<String, Object> result = new LinkedHashMap<>(data);
61
+        int score = 100;
62
+        List<String> issues = new ArrayList<>();
63
+
64
+        // 检查完整性
65
+        if (data.containsKey("LL_flag") && "MISSING".equals(data.get("LL_flag"))) {
66
+            score -= 10;
67
+            issues.add("流量数据缺失");
68
+        }
69
+        // 检查异常
70
+        if (data.containsKey("LL_flag") && "ABNORMAL".equals(data.get("LL_flag"))) {
71
+            score -= 20;
72
+            issues.add("流量数据异常(负值)");
73
+        }
74
+        // 时效性检查
75
+        result.put("quality_score", Math.max(score, 0));
76
+        result.put("quality_issues", issues);
77
+        result.put("quality_checked", true);
78
+        return result;
79
+    }
80
+
81
+    /** 数据关联:建立数据血缘 */
82
+    public void buildLineage(Long sourceId, Long targetId, String relation) {
83
+        String sql = """
84
+            INSERT INTO data_lineage (source_table, source_id, target_table, target_id, relation, created_at)
85
+            VALUES (?, ?, ?, ?, ?, NOW())
86
+            """;
87
+        jdbcTemplate.update(sql, "iot_telemetry", sourceId, "iot_telemetry_hourly", targetId, relation);
88
+    }
89
+}