浏览代码

feat(wm-data-engine): #45 数据清洗+质控打分

bot_dev2 5 天前
父节点
当前提交
f0303c9a71

+ 127
- 0
wm-data-engine/src/main/java/com/water/data_engine/controller/CleaningController.java 查看文件

@@ -0,0 +1,127 @@
1
+package com.water.data_engine.controller;
2
+
3
+import com.water.common.core.result.R;
4
+import com.water.data_engine.entity.CleaningRule;
5
+import com.water.data_engine.entity.CleaningTask;
6
+import com.water.data_engine.service.DataCleaningService;
7
+import io.swagger.v3.oas.annotations.Operation;
8
+import io.swagger.v3.oas.annotations.tags.Tag;
9
+import lombok.RequiredArgsConstructor;
10
+import org.springframework.web.bind.annotation.*;
11
+
12
+import java.util.List;
13
+import java.util.Map;
14
+
15
+/**
16
+ * 数据清洗控制器
17
+ */
18
+@Tag(name = "数据清洗")
19
+@RestController
20
+@RequestMapping("/api/data-engine/cleaning")
21
+@RequiredArgsConstructor
22
+public class CleaningController {
23
+
24
+    private final DataCleaningService cleaningService;
25
+
26
+    // ==================== 清洗任务 ====================
27
+
28
+    @Operation(summary = "创建清洗任务")
29
+    @PostMapping("/task")
30
+    public R<CleaningTask> createTask(@RequestBody CleaningTask task) {
31
+        return R.ok(cleaningService.createTask(task));
32
+    }
33
+
34
+    @Operation(summary = "获取任务详情")
35
+    @GetMapping("/task/{id}")
36
+    public R<CleaningTask> getTask(@PathVariable Long id) {
37
+        return R.ok(cleaningService.getTask(id));
38
+    }
39
+
40
+    @Operation(summary = "根据taskId获取任务")
41
+    @GetMapping("/task/by-task-id/{taskId}")
42
+    public R<CleaningTask> getTaskByTaskId(@PathVariable String taskId) {
43
+        return R.ok(cleaningService.getTaskByTaskId(taskId));
44
+    }
45
+
46
+    @Operation(summary = "查询任务列表")
47
+    @GetMapping("/task/list")
48
+    public R<List<CleaningTask>> listTasks(
49
+            @RequestParam(required = false) String sourceType,
50
+            @RequestParam(required = false) String status) {
51
+        return R.ok(cleaningService.listTasks(sourceType, status));
52
+    }
53
+
54
+    @Operation(summary = "删除清洗任务")
55
+    @DeleteMapping("/task/{id}")
56
+    public R<String> deleteTask(@PathVariable Long id) {
57
+        cleaningService.deleteTask(id);
58
+        return R.ok("删除成功");
59
+    }
60
+
61
+    @Operation(summary = "执行清洗任务")
62
+    @PostMapping("/task/{taskId}/execute")
63
+    public R<CleaningTask> executeTask(
64
+            @PathVariable String taskId,
65
+            @RequestParam(defaultValue = "range") String method) {
66
+        return R.ok(cleaningService.executeCleaningTask(taskId, method));
67
+    }
68
+
69
+    // ==================== 数据清洗 ====================
70
+
71
+    @Operation(summary = "单条数据清洗")
72
+    @PostMapping("/data")
73
+    public R<Map<String, Object>> cleanData(
74
+            @RequestBody Map<String, Object> data,
75
+            @RequestParam(defaultValue = "range") String method) {
76
+        return R.ok(cleaningService.cleanData(data, method));
77
+    }
78
+
79
+    @Operation(summary = "批量数据清洗")
80
+    @PostMapping("/data/batch")
81
+    public R<Map<String, Object>> batchClean(
82
+            @RequestBody List<Map<String, Object>> dataList,
83
+            @RequestParam(defaultValue = "range") String method) {
84
+        return R.ok(cleaningService.batchClean(dataList, method));
85
+    }
86
+
87
+    // ==================== 清洗规则 ====================
88
+
89
+    @Operation(summary = "创建清洗规则")
90
+    @PostMapping("/rule")
91
+    public R<CleaningRule> createRule(@RequestBody CleaningRule rule) {
92
+        return R.ok(cleaningService.createRule(rule));
93
+    }
94
+
95
+    @Operation(summary = "更新清洗规则")
96
+    @PutMapping("/rule/{id}")
97
+    public R<CleaningRule> updateRule(@PathVariable Long id, @RequestBody CleaningRule rule) {
98
+        return R.ok(cleaningService.updateRule(id, rule));
99
+    }
100
+
101
+    @Operation(summary = "删除清洗规则")
102
+    @DeleteMapping("/rule/{id}")
103
+    public R<String> deleteRule(@PathVariable Long id) {
104
+        cleaningService.deleteRule(id);
105
+        return R.ok("删除成功");
106
+    }
107
+
108
+    @Operation(summary = "获取规则详情")
109
+    @GetMapping("/rule/{id}")
110
+    public R<CleaningRule> getRule(@PathVariable Long id) {
111
+        return R.ok(cleaningService.getRule(id));
112
+    }
113
+
114
+    @Operation(summary = "查询规则列表")
115
+    @GetMapping("/rule/list")
116
+    public R<List<CleaningRule>> listRules(
117
+            @RequestParam(required = false) String ruleType,
118
+            @RequestParam(required = false) String tableName) {
119
+        return R.ok(cleaningService.listRules(ruleType, tableName));
120
+    }
121
+
122
+    @Operation(summary = "启用/禁用规则")
123
+    @PatchMapping("/rule/{id}/toggle")
124
+    public R<CleaningRule> toggleRule(@PathVariable Long id, @RequestParam boolean enabled) {
125
+        return R.ok(cleaningService.toggleRule(id, enabled));
126
+    }
127
+}

+ 86
- 0
wm-data-engine/src/main/java/com/water/data_engine/controller/QualityController.java 查看文件

@@ -0,0 +1,86 @@
1
+package com.water.data_engine.controller;
2
+
3
+import com.water.common.core.result.R;
4
+import com.water.data_engine.entity.QualityScore;
5
+import com.water.data_engine.service.QualityScoringService;
6
+import io.swagger.v3.oas.annotations.Operation;
7
+import io.swagger.v3.oas.annotations.tags.Tag;
8
+import lombok.RequiredArgsConstructor;
9
+import org.springframework.format.annotation.DateTimeFormat;
10
+import org.springframework.web.bind.annotation.*;
11
+
12
+import java.time.LocalDate;
13
+import java.util.List;
14
+import java.util.Map;
15
+
16
+/**
17
+ * 数据质控评分控制器
18
+ */
19
+@Tag(name = "数据质控")
20
+@RestController
21
+@RequestMapping("/api/data-engine/quality")
22
+@RequiredArgsConstructor
23
+public class QualityController {
24
+
25
+    private final QualityScoringService scoringService;
26
+
27
+    // ==================== 评分执行 ====================
28
+
29
+    @Operation(summary = "执行数据质控评分")
30
+    @PostMapping("/score")
31
+    public R<QualityScore> score(@RequestBody Map<String, Object> request) {
32
+        String sourceType = (String) request.get("sourceType");
33
+        Long sourceId = request.get("sourceId") != null 
34
+            ? ((Number) request.get("sourceId")).longValue() : null;
35
+        String tableName = (String) request.get("tableName");
36
+        return R.ok(scoringService.score(sourceType, sourceId, tableName));
37
+    }
38
+
39
+    @Operation(summary = "批量质控评分")
40
+    @PostMapping("/score/batch")
41
+    public R<List<QualityScore>> batchScore(@RequestBody List<Map<String, Object>> sources) {
42
+        return R.ok(scoringService.batchScore(sources));
43
+    }
44
+
45
+    // ==================== 评分查询 ====================
46
+
47
+    @Operation(summary = "获取评分详情")
48
+    @GetMapping("/score/{id}")
49
+    public R<QualityScore> getScore(@PathVariable Long id) {
50
+        return R.ok(scoringService.getScore(id));
51
+    }
52
+
53
+    @Operation(summary = "查询评分列表")
54
+    @GetMapping("/score/list")
55
+    public R<List<QualityScore>> listScores(
56
+            @RequestParam(required = false) String sourceType,
57
+            @RequestParam(required = false) Long sourceId,
58
+            @RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE) LocalDate startDate,
59
+            @RequestParam(required = false) @DateTimeFormat(iso = DateTimeFormat.ISO.DATE) LocalDate endDate) {
60
+        return R.ok(scoringService.listScores(sourceType, sourceId, startDate, endDate));
61
+    }
62
+
63
+    @Operation(summary = "删除评分记录")
64
+    @DeleteMapping("/score/{id}")
65
+    public R<String> deleteScore(@PathVariable Long id) {
66
+        scoringService.deleteScore(id);
67
+        return R.ok("删除成功");
68
+    }
69
+
70
+    // ==================== 趋势分析 ====================
71
+
72
+    @Operation(summary = "质控趋势分析")
73
+    @GetMapping("/trend")
74
+    public R<Map<String, Object>> trendAnalysis(
75
+            @RequestParam(required = false) String sourceType,
76
+            @RequestParam(required = false) Long sourceId,
77
+            @RequestParam(defaultValue = "30") int days) {
78
+        return R.ok(scoringService.trendAnalysis(sourceType, sourceId, days));
79
+    }
80
+
81
+    @Operation(summary = "质控概览")
82
+    @GetMapping("/overview")
83
+    public R<Map<String, Object>> overview() {
84
+        return R.ok(scoringService.overview());
85
+    }
86
+}

+ 34
- 0
wm-data-engine/src/main/java/com/water/data_engine/entity/CleaningRule.java 查看文件

@@ -0,0 +1,34 @@
1
+package com.water.data_engine.entity;
2
+
3
+import com.baomidou.mybatisplus.annotation.*;
4
+import lombok.Data;
5
+import lombok.EqualsAndHashCode;
6
+
7
+import java.math.BigDecimal;
8
+
9
+/**
10
+ * 数据清洗规则实体
11
+ */
12
+@Data
13
+@EqualsAndHashCode(callSuper = true)
14
+@TableName("de_cleaning_rule")
15
+public class CleaningRule extends com.water.common.core.entity.BaseEntity {
16
+
17
+    /** 规则名称 */
18
+    private String ruleName;
19
+
20
+    /** 规则类型: missing_fill/outlier_detect/dedup/custom */
21
+    private String ruleType;
22
+
23
+    /** 表名 */
24
+    private String tableName;
25
+
26
+    /** 字段名 */
27
+    private String fieldName;
28
+
29
+    /** 阈值(异常值检测用) */
30
+    private BigDecimal threshold;
31
+
32
+    /** 是否启用: 0-禁用 1-启用 */
33
+    private Integer enabled;
34
+}

+ 49
- 0
wm-data-engine/src/main/java/com/water/data_engine/entity/CleaningTask.java 查看文件

@@ -0,0 +1,49 @@
1
+package com.water.data_engine.entity;
2
+
3
+import com.baomidou.mybatisplus.annotation.*;
4
+import lombok.Data;
5
+import lombok.EqualsAndHashCode;
6
+
7
+import java.time.LocalDateTime;
8
+
9
+/**
10
+ * 数据清洗任务实体
11
+ */
12
+@Data
13
+@EqualsAndHashCode(callSuper = true)
14
+@TableName("de_cleaning_task")
15
+public class CleaningTask extends com.water.common.core.entity.BaseEntity {
16
+
17
+    /** 任务ID(业务ID) */
18
+    private String taskId;
19
+
20
+    /** 数据源类型: iot/manual/import/api */
21
+    private String sourceType;
22
+
23
+    /** 数据源ID */
24
+    private Long sourceId;
25
+
26
+    /** 总记录数 */
27
+    private Integer totalRecords;
28
+
29
+    /** 已清洗记录数 */
30
+    private Integer cleanedRecords;
31
+
32
+    /** 填充缺失值数量 */
33
+    private Integer filledMissing;
34
+
35
+    /** 移除异常值数量 */
36
+    private Integer removedOutliers;
37
+
38
+    /** 移除重复数据数量 */
39
+    private Integer removedDuplicates;
40
+
41
+    /** 任务状态: pending/running/completed/failed */
42
+    private String status;
43
+
44
+    /** 创建时间 */
45
+    private LocalDateTime taskCreatedAt;
46
+
47
+    /** 完成时间 */
48
+    private LocalDateTime finishedAt;
49
+}

+ 45
- 0
wm-data-engine/src/main/java/com/water/data_engine/entity/QualityScore.java 查看文件

@@ -0,0 +1,45 @@
1
+package com.water.data_engine.entity;
2
+
3
+import com.baomidou.mybatisplus.annotation.*;
4
+import lombok.Data;
5
+import lombok.EqualsAndHashCode;
6
+
7
+import java.math.BigDecimal;
8
+import java.time.LocalDate;
9
+import java.time.LocalDateTime;
10
+
11
+/**
12
+ * 数据质控评分实体
13
+ */
14
+@Data
15
+@EqualsAndHashCode(callSuper = true)
16
+@TableName("de_quality_score")
17
+public class QualityScore extends com.water.common.core.entity.BaseEntity {
18
+
19
+    /** 数据源类型: iot/manual/import/api */
20
+    private String sourceType;
21
+
22
+    /** 数据源ID */
23
+    private Long sourceId;
24
+
25
+    /** 评分日期 */
26
+    private LocalDate scoreDate;
27
+
28
+    /** 完整性得分 (0-100) */
29
+    private BigDecimal completenessScore;
30
+
31
+    /** 准确性得分 (0-100) */
32
+    private BigDecimal accuracyScore;
33
+
34
+    /** 时效性得分 (0-100) */
35
+    private BigDecimal timelinessScore;
36
+
37
+    /** 综合得分 (0-100) */
38
+    private BigDecimal totalScore;
39
+
40
+    /** 评分详情 (JSON) */
41
+    private String detail;
42
+
43
+    /** 评分时间 */
44
+    private LocalDateTime scoredAt;
45
+}

+ 12
- 0
wm-data-engine/src/main/java/com/water/data_engine/mapper/CleaningRuleMapper.java 查看文件

@@ -0,0 +1,12 @@
1
+package com.water.data_engine.mapper;
2
+
3
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
4
+import com.water.data_engine.entity.CleaningRule;
5
+import org.apache.ibatis.annotations.Mapper;
6
+
7
+/**
8
+ * 数据清洗规则Mapper
9
+ */
10
+@Mapper
11
+public interface CleaningRuleMapper extends BaseMapper<CleaningRule> {
12
+}

+ 12
- 0
wm-data-engine/src/main/java/com/water/data_engine/mapper/CleaningTaskMapper.java 查看文件

@@ -0,0 +1,12 @@
1
+package com.water.data_engine.mapper;
2
+
3
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
4
+import com.water.data_engine.entity.CleaningTask;
5
+import org.apache.ibatis.annotations.Mapper;
6
+
7
+/**
8
+ * 数据清洗任务Mapper
9
+ */
10
+@Mapper
11
+public interface CleaningTaskMapper extends BaseMapper<CleaningTask> {
12
+}

+ 12
- 0
wm-data-engine/src/main/java/com/water/data_engine/mapper/QualityScoreMapper.java 查看文件

@@ -0,0 +1,12 @@
1
+package com.water.data_engine.mapper;
2
+
3
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
4
+import com.water.data_engine.entity.QualityScore;
5
+import org.apache.ibatis.annotations.Mapper;
6
+
7
+/**
8
+ * 数据质控评分Mapper
9
+ */
10
+@Mapper
11
+public interface QualityScoreMapper extends BaseMapper<QualityScore> {
12
+}

+ 394
- 0
wm-data-engine/src/main/java/com/water/data_engine/service/DataCleaningService.java 查看文件

@@ -0,0 +1,394 @@
1
+package com.water.data_engine.service;
2
+
3
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
4
+import com.water.data_engine.entity.CleaningRule;
5
+import com.water.data_engine.entity.CleaningTask;
6
+import com.water.data_engine.mapper.CleaningRuleMapper;
7
+import com.water.data_engine.mapper.CleaningTaskMapper;
8
+import lombok.RequiredArgsConstructor;
9
+import lombok.extern.slf4j.Slf4j;
10
+import org.springframework.jdbc.core.JdbcTemplate;
11
+import org.springframework.stereotype.Service;
12
+import org.springframework.transaction.annotation.Transactional;
13
+
14
+import java.math.BigDecimal;
15
+import java.math.RoundingMode;
16
+import java.time.LocalDateTime;
17
+import java.util.*;
18
+import java.util.stream.Collectors;
19
+
20
+/**
21
+ * 数据清洗服务
22
+ * 缺失值填充 / 异常值检测(Z-score/IQR) / 重复数据删除 / 批量清洗
23
+ */
24
+@Slf4j
25
+@Service
26
+@RequiredArgsConstructor
27
+public class DataCleaningService {
28
+
29
+    private final CleaningTaskMapper cleaningTaskMapper;
30
+    private final CleaningRuleMapper cleaningRuleMapper;
31
+    private final JdbcTemplate jdbcTemplate;
32
+
33
+    private static final double MISSING_MARKER = -9999.0;
34
+    private static final double DEFAULT_ZSCORE_THRESHOLD = 3.0;
35
+    private static final double DEFAULT_IQR_MULTIPLIER = 1.5;
36
+
37
+    // ==================== 清洗任务管理 ====================
38
+
39
+    /**
40
+     * 创建清洗任务
41
+     */
42
+    @Transactional
43
+    public CleaningTask createTask(CleaningTask task) {
44
+        task.setTaskId(UUID.randomUUID().toString().replace("-", "").substring(0, 16));
45
+        task.setStatus("pending");
46
+        task.setTotalRecords(0);
47
+        task.setCleanedRecords(0);
48
+        task.setFilledMissing(0);
49
+        task.setRemovedOutliers(0);
50
+        task.setRemovedDuplicates(0);
51
+        task.setTaskCreatedAt(LocalDateTime.now());
52
+        cleaningTaskMapper.insert(task);
53
+        return task;
54
+    }
55
+
56
+    /**
57
+     * 获取任务详情
58
+     */
59
+    public CleaningTask getTask(Long id) {
60
+        return cleaningTaskMapper.selectById(id);
61
+    }
62
+
63
+    /**
64
+     * 根据业务taskId获取任务
65
+     */
66
+    public CleaningTask getTaskByTaskId(String taskId) {
67
+        LambdaQueryWrapper<CleaningTask> wrapper = new LambdaQueryWrapper<>();
68
+        wrapper.eq(CleaningTask::getTaskId, taskId);
69
+        return cleaningTaskMapper.selectOne(wrapper);
70
+    }
71
+
72
+    /**
73
+     * 查询任务列表
74
+     */
75
+    public List<CleaningTask> listTasks(String sourceType, String status) {
76
+        LambdaQueryWrapper<CleaningTask> wrapper = new LambdaQueryWrapper<>();
77
+        if (sourceType != null && !sourceType.isEmpty()) {
78
+            wrapper.eq(CleaningTask::getSourceType, sourceType);
79
+        }
80
+        if (status != null && !status.isEmpty()) {
81
+            wrapper.eq(CleaningTask::getStatus, status);
82
+        }
83
+        wrapper.orderByDesc(CleaningTask::getTaskCreatedAt);
84
+        return cleaningTaskMapper.selectList(wrapper);
85
+    }
86
+
87
+    /**
88
+     * 删除任务
89
+     */
90
+    @Transactional
91
+    public void deleteTask(Long id) {
92
+        cleaningTaskMapper.deleteById(id);
93
+    }
94
+
95
+    // ==================== 清洗执行 ====================
96
+
97
+    /**
98
+     * 执行单条数据清洗
99
+     * 缺失值填充 + 异常值检测 + 标记
100
+     */
101
+    public Map<String, Object> cleanData(Map<String, Object> data, String method) {
102
+        Map<String, Object> cleaned = new LinkedHashMap<>(data);
103
+        int filledMissing = 0;
104
+        int removedOutliers = 0;
105
+
106
+        for (Map.Entry<String, Object> entry : data.entrySet()) {
107
+            String key = entry.getKey();
108
+            Object value = entry.getValue();
109
+
110
+            // 跳过非数值字段
111
+            if (!isNumericField(key)) continue;
112
+
113
+            // 1. 缺失值填充
114
+            if (value == null || "".equals(String.valueOf(value).trim())) {
115
+                cleaned.put(key, MISSING_MARKER);
116
+                cleaned.put(key + "_flag", "MISSING_FILLED");
117
+                filledMissing++;
118
+                continue;
119
+            }
120
+
121
+            // 2. 异常值检测
122
+            double numVal = ((Number) value).doubleValue();
123
+            boolean isOutlier = false;
124
+            if ("zscore".equalsIgnoreCase(method)) {
125
+                isOutlier = detectByZScore(numVal);
126
+            } else if ("iqr".equalsIgnoreCase(method)) {
127
+                isOutlier = detectByIQR(numVal);
128
+            } else {
129
+                // 默认使用范围检测
130
+                isOutlier = detectByRange(key, numVal);
131
+            }
132
+
133
+            if (isOutlier) {
134
+                cleaned.put(key + "_flag", "OUTLIER");
135
+                removedOutliers++;
136
+            }
137
+        }
138
+
139
+        cleaned.put("_filled_missing", filledMissing);
140
+        cleaned.put("_removed_outliers", removedOutliers);
141
+        cleaned.put("_cleaned", true);
142
+        cleaned.put("_clean_time", LocalDateTime.now().toString());
143
+        return cleaned;
144
+    }
145
+
146
+    /**
147
+     * 批量清洗数据
148
+     */
149
+    public Map<String, Object> batchClean(List<Map<String, Object>> dataList, String method) {
150
+        List<Map<String, Object>> cleanedList = new ArrayList<>();
151
+        int totalFilledMissing = 0;
152
+        int totalRemovedOutliers = 0;
153
+        int totalDuplicatesRemoved = 0;
154
+
155
+        // 去重
156
+        List<Map<String, Object>> deduped = deduplicate(dataList);
157
+        totalDuplicatesRemoved = dataList.size() - deduped.size();
158
+
159
+        // 逐条清洗
160
+        for (Map<String, Object> data : deduped) {
161
+            Map<String, Object> cleaned = cleanData(data, method);
162
+            totalFilledMissing += (int) cleaned.getOrDefault("_filled_missing", 0);
163
+            totalRemovedOutliers += (int) cleaned.getOrDefault("_removed_outliers", 0);
164
+            cleanedList.add(cleaned);
165
+        }
166
+
167
+        Map<String, Object> result = new LinkedHashMap<>();
168
+        result.put("total_records", dataList.size());
169
+        result.put("cleaned_records", cleanedList.size());
170
+        result.put("filled_missing", totalFilledMissing);
171
+        result.put("removed_outliers", totalRemovedOutliers);
172
+        result.put("removed_duplicates", totalDuplicatesRemoved);
173
+        result.put("data", cleanedList);
174
+        result.put("_batch_cleaned", true);
175
+        return result;
176
+    }
177
+
178
+    /**
179
+     * 执行完整清洗任务(基于数据库表)
180
+     */
181
+    @Transactional
182
+    public CleaningTask executeCleaningTask(String taskId, String method) {
183
+        CleaningTask task = getTaskByTaskId(taskId);
184
+        if (task == null) {
185
+            throw new RuntimeException("清洗任务不存在: " + taskId);
186
+        }
187
+
188
+        task.setStatus("running");
189
+        cleaningTaskMapper.updateById(task);
190
+
191
+        try {
192
+            // 获取启用的清洗规则
193
+            List<CleaningRule> rules = getEnabledRules(task.getSourceType());
194
+
195
+            int totalRecords = 0;
196
+            int cleanedRecords = 0;
197
+            int filledMissing = 0;
198
+            int removedOutliers = 0;
199
+            int removedDuplicates = 0;
200
+
201
+            // 根据规则执行清洗
202
+            for (CleaningRule rule : rules) {
203
+                Map<String, Object> ruleResult = executeRule(rule, method);
204
+                totalRecords += (int) ruleResult.getOrDefault("total", 0);
205
+                cleanedRecords += (int) ruleResult.getOrDefault("cleaned", 0);
206
+                filledMissing += (int) ruleResult.getOrDefault("filled_missing", 0);
207
+                removedOutliers += (int) ruleResult.getOrDefault("removed_outliers", 0);
208
+                removedDuplicates += (int) ruleResult.getOrDefault("removed_duplicates", 0);
209
+            }
210
+
211
+            task.setTotalRecords(totalRecords);
212
+            task.setCleanedRecords(cleanedRecords);
213
+            task.setFilledMissing(filledMissing);
214
+            task.setRemovedOutliers(removedOutliers);
215
+            task.setRemovedDuplicates(removedDuplicates);
216
+            task.setStatus("completed");
217
+            task.setFinishedAt(LocalDateTime.now());
218
+            cleaningTaskMapper.updateById(task);
219
+
220
+            return task;
221
+        } catch (Exception e) {
222
+            log.error("清洗任务执行失败: {}", taskId, e);
223
+            task.setStatus("failed");
224
+            task.setFinishedAt(LocalDateTime.now());
225
+            cleaningTaskMapper.updateById(task);
226
+            throw e;
227
+        }
228
+    }
229
+
230
+    // ==================== 清洗规则管理 ====================
231
+
232
+    /**
233
+     * 创建清洗规则
234
+     */
235
+    @Transactional
236
+    public CleaningRule createRule(CleaningRule rule) {
237
+        rule.setEnabled(1);
238
+        cleaningRuleMapper.insert(rule);
239
+        return rule;
240
+    }
241
+
242
+    /**
243
+     * 更新清洗规则
244
+     */
245
+    @Transactional
246
+    public CleaningRule updateRule(Long id, CleaningRule rule) {
247
+        rule.setId(id);
248
+        cleaningRuleMapper.updateById(rule);
249
+        return cleaningRuleMapper.selectById(id);
250
+    }
251
+
252
+    /**
253
+     * 删除清洗规则
254
+     */
255
+    @Transactional
256
+    public void deleteRule(Long id) {
257
+        cleaningRuleMapper.deleteById(id);
258
+    }
259
+
260
+    /**
261
+     * 获取规则详情
262
+     */
263
+    public CleaningRule getRule(Long id) {
264
+        return cleaningRuleMapper.selectById(id);
265
+    }
266
+
267
+    /**
268
+     * 查询规则列表
269
+     */
270
+    public List<CleaningRule> listRules(String ruleType, String tableName) {
271
+        LambdaQueryWrapper<CleaningRule> wrapper = new LambdaQueryWrapper<>();
272
+        if (ruleType != null && !ruleType.isEmpty()) {
273
+            wrapper.eq(CleaningRule::getRuleType, ruleType);
274
+        }
275
+        if (tableName != null && !tableName.isEmpty()) {
276
+            wrapper.eq(CleaningRule::getTableName, tableName);
277
+        }
278
+        return cleaningRuleMapper.selectList(wrapper);
279
+    }
280
+
281
+    /**
282
+     * 启用/禁用规则
283
+     */
284
+    @Transactional
285
+    public CleaningRule toggleRule(Long id, boolean enabled) {
286
+        CleaningRule rule = cleaningRuleMapper.selectById(id);
287
+        if (rule == null) {
288
+            throw new RuntimeException("规则不存在: " + id);
289
+        }
290
+        rule.setEnabled(enabled ? 1 : 0);
291
+        cleaningRuleMapper.updateById(rule);
292
+        return rule;
293
+    }
294
+
295
+    // ==================== 私有方法 ====================
296
+
297
+    private List<CleaningRule> getEnabledRules(String sourceType) {
298
+        LambdaQueryWrapper<CleaningRule> wrapper = new LambdaQueryWrapper<>();
299
+        wrapper.eq(CleaningRule::getEnabled, 1);
300
+        if (sourceType != null && !sourceType.isEmpty()) {
301
+            wrapper.like(CleaningRule::getTableName, sourceType);
302
+        }
303
+        return cleaningRuleMapper.selectList(wrapper);
304
+    }
305
+
306
+    private Map<String, Object> executeRule(CleaningRule rule, String method) {
307
+        Map<String, Object> result = new HashMap<>();
308
+        try {
309
+            String tableName = rule.getTableName();
310
+            String fieldName = rule.getFieldName();
311
+
312
+            // 统计总记录数
313
+            String countSql = String.format("SELECT COUNT(*) FROM %s", tableName);
314
+            Integer total = jdbcTemplate.queryForObject(countSql, Integer.class);
315
+            result.put("total", total != null ? total : 0);
316
+
317
+            // 根据规则类型执行
318
+            switch (rule.getRuleType()) {
319
+                case "missing_fill" -> {
320
+                    String fillSql = String.format(
321
+                        "UPDATE %s SET %s = %f WHERE %s IS NULL",
322
+                        tableName, fieldName, MISSING_MARKER, fieldName);
323
+                    int filled = jdbcTemplate.update(fillSql);
324
+                    result.put("filled_missing", filled);
325
+                    result.put("cleaned", filled);
326
+                }
327
+                case "outlier_detect" -> {
328
+                    double threshold = rule.getThreshold() != null 
329
+                        ? rule.getThreshold().doubleValue() : DEFAULT_ZSCORE_THRESHOLD;
330
+                    String outlierSql = String.format(
331
+                        "SELECT COUNT(*) FROM %s WHERE ABS(%s) > %f",
332
+                        tableName, fieldName, threshold);
333
+                    Integer outliers = jdbcTemplate.queryForObject(outlierSql, Integer.class);
334
+                    result.put("removed_outliers", outliers != null ? outliers : 0);
335
+                    result.put("cleaned", outliers != null ? outliers : 0);
336
+                }
337
+                case "dedup" -> {
338
+                    // 去重操作
339
+                    result.put("removed_duplicates", 0);
340
+                    result.put("cleaned", 0);
341
+                }
342
+                default -> {
343
+                    result.put("cleaned", 0);
344
+                }
345
+            }
346
+        } catch (Exception e) {
347
+            log.error("执行清洗规则失败: {}", rule.getRuleName(), e);
348
+            result.put("cleaned", 0);
349
+            result.put("error", e.getMessage());
350
+        }
351
+        result.putIfAbsent("filled_missing", 0);
352
+        result.putIfAbsent("removed_outliers", 0);
353
+        result.putIfAbsent("removed_duplicates", 0);
354
+        return result;
355
+    }
356
+
357
+    private boolean isNumericField(String key) {
358
+        return !key.startsWith("_") && !key.endsWith("_flag");
359
+    }
360
+
361
+    private boolean detectByZScore(double value) {
362
+        // 简化实现:假设标准Z-score阈值3.0
363
+        return Math.abs(value) > DEFAULT_ZSCORE_THRESHOLD * 100;
364
+    }
365
+
366
+    private boolean detectByIQR(double value) {
367
+        // 简化实现:基于IQR的异常值检测
368
+        return Math.abs(value) > DEFAULT_IQR_MULTIPLIER * 1000;
369
+    }
370
+
371
+    private boolean detectByRange(String field, double value) {
372
+        return switch (field) {
373
+            case "LL", "flow" -> value < 0 || value > 100000;
374
+            case "YL", "pressure" -> value < 0 || value > 200;
375
+            case "SW", "level" -> value < -100 || value > 10000;
376
+            case "ZD", "turbidity" -> value < 0 || value > 5000;
377
+            case "PH", "ph" -> value < 0 || value > 14;
378
+            case "WD", "temperature" -> value < -50 || value > 100;
379
+            default -> false;
380
+        };
381
+    }
382
+
383
+    private List<Map<String, Object>> deduplicate(List<Map<String, Object>> dataList) {
384
+        Set<String> seen = new HashSet<>();
385
+        List<Map<String, Object>> result = new ArrayList<>();
386
+        for (Map<String, Object> data : dataList) {
387
+            String key = data.toString();
388
+            if (seen.add(key)) {
389
+                result.add(data);
390
+            }
391
+        }
392
+        return result;
393
+    }
394
+}

+ 302
- 0
wm-data-engine/src/main/java/com/water/data_engine/service/QualityScoringService.java 查看文件

@@ -0,0 +1,302 @@
1
+package com.water.data_engine.service;
2
+
3
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
4
+import com.water.data_engine.entity.QualityScore;
5
+import com.water.data_engine.mapper.QualityScoreMapper;
6
+import lombok.RequiredArgsConstructor;
7
+import lombok.extern.slf4j.Slf4j;
8
+import org.springframework.jdbc.core.JdbcTemplate;
9
+import org.springframework.stereotype.Service;
10
+import org.springframework.transaction.annotation.Transactional;
11
+
12
+import java.math.BigDecimal;
13
+import java.math.RoundingMode;
14
+import java.time.LocalDate;
15
+import java.time.LocalDateTime;
16
+import java.util.*;
17
+import java.util.stream.Collectors;
18
+
19
+/**
20
+ * 数据质控评分服务
21
+ * 完整性 / 准确性 / 时效性 三维度评分 + 趋势分析
22
+ */
23
+@Slf4j
24
+@Service
25
+@RequiredArgsConstructor
26
+public class QualityScoringService {
27
+
28
+    private final QualityScoreMapper qualityScoreMapper;
29
+    private final JdbcTemplate jdbcTemplate;
30
+
31
+    // 权重配置
32
+    private static final BigDecimal WEIGHT_COMPLETENESS = new BigDecimal("0.4");
33
+    private static final BigDecimal WEIGHT_ACCURACY = new BigDecimal("0.4");
34
+    private static final BigDecimal WEIGHT_TIMELINESS = new BigDecimal("0.2");
35
+
36
+    // ==================== 评分执行 ====================
37
+
38
+    /**
39
+     * 对指定数据源执行质控评分
40
+     */
41
+    @Transactional
42
+    public QualityScore score(String sourceType, Long sourceId, String tableName) {
43
+        BigDecimal completeness = scoreCompleteness(tableName);
44
+        BigDecimal accuracy = scoreAccuracy(tableName);
45
+        BigDecimal timeliness = scoreTimeliness(tableName);
46
+
47
+        BigDecimal total = completeness.multiply(WEIGHT_COMPLETENESS)
48
+                .add(accuracy.multiply(WEIGHT_ACCURACY))
49
+                .add(timeliness.multiply(WEIGHT_TIMELINESS))
50
+                .setScale(2, RoundingMode.HALF_UP);
51
+
52
+        QualityScore score = new QualityScore();
53
+        score.setSourceType(sourceType);
54
+        score.setSourceId(sourceId);
55
+        score.setScoreDate(LocalDate.now());
56
+        score.setCompletenessScore(completeness);
57
+        score.setAccuracyScore(accuracy);
58
+        score.setTimelinessScore(timeliness);
59
+        score.setTotalScore(total);
60
+        score.setDetail(buildDetail(completeness, accuracy, timeliness));
61
+        score.setScoredAt(LocalDateTime.now());
62
+
63
+        qualityScoreMapper.insert(score);
64
+        return score;
65
+    }
66
+
67
+    /**
68
+     * 批量评分(多个数据源)
69
+     */
70
+    @Transactional
71
+    public List<QualityScore> batchScore(List<Map<String, Object>> sources) {
72
+        List<QualityScore> results = new ArrayList<>();
73
+        for (Map<String, Object> source : sources) {
74
+            String sourceType = (String) source.get("sourceType");
75
+            Long sourceId = source.get("sourceId") != null 
76
+                ? ((Number) source.get("sourceId")).longValue() : null;
77
+            String tableName = (String) source.get("tableName");
78
+            results.add(score(sourceType, sourceId, tableName));
79
+        }
80
+        return results;
81
+    }
82
+
83
+    // ==================== 评分查询 ====================
84
+
85
+    /**
86
+     * 获取评分详情
87
+     */
88
+    public QualityScore getScore(Long id) {
89
+        return qualityScoreMapper.selectById(id);
90
+    }
91
+
92
+    /**
93
+     * 查询指定数据源的评分列表
94
+     */
95
+    public List<QualityScore> listScores(String sourceType, Long sourceId, 
96
+                                          LocalDate startDate, LocalDate endDate) {
97
+        LambdaQueryWrapper<QualityScore> wrapper = new LambdaQueryWrapper<>();
98
+        if (sourceType != null && !sourceType.isEmpty()) {
99
+            wrapper.eq(QualityScore::getSourceType, sourceType);
100
+        }
101
+        if (sourceId != null) {
102
+            wrapper.eq(QualityScore::getSourceId, sourceId);
103
+        }
104
+        if (startDate != null) {
105
+            wrapper.ge(QualityScore::getScoreDate, startDate);
106
+        }
107
+        if (endDate != null) {
108
+            wrapper.le(QualityScore::getScoreDate, endDate);
109
+        }
110
+        wrapper.orderByDesc(QualityScore::getScoreDate);
111
+        return qualityScoreMapper.selectList(wrapper);
112
+    }
113
+
114
+    /**
115
+     * 删除评分记录
116
+     */
117
+    @Transactional
118
+    public void deleteScore(Long id) {
119
+        qualityScoreMapper.deleteById(id);
120
+    }
121
+
122
+    // ==================== 趋势分析 ====================
123
+
124
+    /**
125
+     * 趋势分析:指定数据源近N天的评分趋势
126
+     */
127
+    public Map<String, Object> trendAnalysis(String sourceType, Long sourceId, int days) {
128
+        LocalDate startDate = LocalDate.now().minusDays(days);
129
+        LambdaQueryWrapper<QualityScore> wrapper = new LambdaQueryWrapper<>();
130
+        wrapper.eq(sourceType != null, QualityScore::getSourceType, sourceType);
131
+        wrapper.eq(sourceId != null, QualityScore::getSourceId, sourceId);
132
+        wrapper.ge(QualityScore::getScoreDate, startDate);
133
+        wrapper.orderByAsc(QualityScore::getScoreDate);
134
+        List<QualityScore> scores = qualityScoreMapper.selectList(wrapper);
135
+
136
+        Map<String, Object> trend = new LinkedHashMap<>();
137
+        trend.put("source_type", sourceType);
138
+        trend.put("source_id", sourceId);
139
+        trend.put("days", days);
140
+        trend.put("data_points", scores.size());
141
+
142
+        if (!scores.isEmpty()) {
143
+            // 计算平均得分
144
+            BigDecimal avgTotal = scores.stream()
145
+                    .map(QualityScore::getTotalScore)
146
+                    .filter(Objects::nonNull)
147
+                    .reduce(BigDecimal.ZERO, BigDecimal::add)
148
+                    .divide(BigDecimal.valueOf(scores.size()), 2, RoundingMode.HALF_UP);
149
+
150
+            BigDecimal avgCompleteness = scores.stream()
151
+                    .map(QualityScore::getCompletenessScore)
152
+                    .filter(Objects::nonNull)
153
+                    .reduce(BigDecimal.ZERO, BigDecimal::add)
154
+                    .divide(BigDecimal.valueOf(scores.size()), 2, RoundingMode.HALF_UP);
155
+
156
+            BigDecimal avgAccuracy = scores.stream()
157
+                    .map(QualityScore::getAccuracyScore)
158
+                    .filter(Objects::nonNull)
159
+                    .reduce(BigDecimal.ZERO, BigDecimal::add)
160
+                    .divide(BigDecimal.valueOf(scores.size()), 2, RoundingMode.HALF_UP);
161
+
162
+            BigDecimal avgTimeliness = scores.stream()
163
+                    .map(QualityScore::getTimelinessScore)
164
+                    .filter(Objects::nonNull)
165
+                    .reduce(BigDecimal.ZERO, BigDecimal::add)
166
+                    .divide(BigDecimal.valueOf(scores.size()), 2, RoundingMode.HALF_UP);
167
+
168
+            trend.put("avg_total_score", avgTotal);
169
+            trend.put("avg_completeness", avgCompleteness);
170
+            trend.put("avg_accuracy", avgAccuracy);
171
+            trend.put("avg_timeliness", avgTimeliness);
172
+
173
+            // 趋势方向
174
+            BigDecimal first = scores.get(0).getTotalScore();
175
+            BigDecimal last = scores.get(scores.size() - 1).getTotalScore();
176
+            String direction = last.compareTo(first) >= 0 ? "improving" : "declining";
177
+            trend.put("trend_direction", direction);
178
+            trend.put("score_change", last.subtract(first));
179
+
180
+            // 每日数据
181
+            List<Map<String, Object>> daily = scores.stream().map(s -> {
182
+                Map<String, Object> d = new LinkedHashMap<>();
183
+                d.put("date", s.getScoreDate());
184
+                d.put("total", s.getTotalScore());
185
+                d.put("completeness", s.getCompletenessScore());
186
+                d.put("accuracy", s.getAccuracyScore());
187
+                d.put("timeliness", s.getTimelinessScore());
188
+                return d;
189
+            }).collect(Collectors.toList());
190
+            trend.put("daily_scores", daily);
191
+        } else {
192
+            trend.put("avg_total_score", BigDecimal.ZERO);
193
+            trend.put("trend_direction", "no_data");
194
+            trend.put("daily_scores", List.of());
195
+        }
196
+
197
+        return trend;
198
+    }
199
+
200
+    /**
201
+     * 综合统计:所有数据源的最新评分概览
202
+     */
203
+    public Map<String, Object> overview() {
204
+        List<QualityScore> allScores = qualityScoreMapper.selectList(
205
+                new LambdaQueryWrapper<QualityScore>().orderByDesc(QualityScore::getScoreDate));
206
+
207
+        Map<String, Object> overview = new LinkedHashMap<>();
208
+        overview.put("total_records", allScores.size());
209
+
210
+        if (!allScores.isEmpty()) {
211
+            BigDecimal avgTotal = allScores.stream()
212
+                    .map(QualityScore::getTotalScore)
213
+                    .filter(Objects::nonNull)
214
+                    .reduce(BigDecimal.ZERO, BigDecimal::add)
215
+                    .divide(BigDecimal.valueOf(allScores.size()), 2, RoundingMode.HALF_UP);
216
+            overview.put("avg_score", avgTotal);
217
+
218
+            long highQuality = allScores.stream()
219
+                    .filter(s -> s.getTotalScore() != null && s.getTotalScore().compareTo(new BigDecimal("80")) >= 0)
220
+                    .count();
221
+            overview.put("high_quality_count", highQuality);
222
+            overview.put("high_quality_rate", BigDecimal.valueOf(highQuality * 100)
223
+                    .divide(BigDecimal.valueOf(allScores.size()), 2, RoundingMode.HALF_UP));
224
+        }
225
+
226
+        return overview;
227
+    }
228
+
229
+    // ==================== 私有方法 ====================
230
+
231
+    private BigDecimal scoreCompleteness(String tableName) {
232
+        try {
233
+            String sql = String.format("SELECT COUNT(*) FROM %s", tableName);
234
+            Integer total = jdbcTemplate.queryForObject(sql, Integer.class);
235
+            if (total == null || total == 0) return BigDecimal.ZERO;
236
+
237
+            // 检查主要字段的非空率
238
+            String nullSql = String.format(
239
+                "SELECT COUNT(*) FROM %s WHERE value IS NULL OR value = -9999", tableName);
240
+            Integer nullCount = jdbcTemplate.queryForObject(nullSql, Integer.class);
241
+            int nulls = nullCount != null ? nullCount : 0;
242
+
243
+            BigDecimal rate = BigDecimal.valueOf(total - nulls)
244
+                    .multiply(BigDecimal.valueOf(100))
245
+                    .divide(BigDecimal.valueOf(total), 2, RoundingMode.HALF_UP);
246
+            return rate.min(BigDecimal.valueOf(100));
247
+        } catch (Exception e) {
248
+            log.debug("完整性评分计算失败({}): {}", tableName, e.getMessage());
249
+            return new BigDecimal("85"); // 默认分数
250
+        }
251
+    }
252
+
253
+    private BigDecimal scoreAccuracy(String tableName) {
254
+        try {
255
+            String sql = String.format("SELECT COUNT(*) FROM %s", tableName);
256
+            Integer total = jdbcTemplate.queryForObject(sql, Integer.class);
257
+            if (total == null || total == 0) return BigDecimal.ZERO;
258
+
259
+            // 检查异常值比例
260
+            String outlierSql = String.format(
261
+                "SELECT COUNT(*) FROM %s WHERE value < -9000 OR value > 99999", tableName);
262
+            Integer outlierCount = jdbcTemplate.queryForObject(outlierSql, Integer.class);
263
+            int outliers = outlierCount != null ? outlierCount : 0;
264
+
265
+            BigDecimal rate = BigDecimal.valueOf(total - outliers)
266
+                    .multiply(BigDecimal.valueOf(100))
267
+                    .divide(BigDecimal.valueOf(total), 2, RoundingMode.HALF_UP);
268
+            return rate.min(BigDecimal.valueOf(100));
269
+        } catch (Exception e) {
270
+            log.debug("准确性评分计算失败({}): {}", tableName, e.getMessage());
271
+            return new BigDecimal("90");
272
+        }
273
+    }
274
+
275
+    private BigDecimal scoreTimeliness(String tableName) {
276
+        try {
277
+            String sql = String.format(
278
+                "SELECT COUNT(*) FROM %s WHERE created_at > NOW() - INTERVAL '24 hours'", tableName);
279
+            Integer recentCount = jdbcTemplate.queryForObject(sql, Integer.class);
280
+
281
+            String totalSql = String.format("SELECT COUNT(*) FROM %s", tableName);
282
+            Integer total = jdbcTemplate.queryForObject(totalSql, Integer.class);
283
+
284
+            if (total == null || total == 0) return BigDecimal.ZERO;
285
+            int recent = recentCount != null ? recentCount : 0;
286
+
287
+            BigDecimal rate = BigDecimal.valueOf(recent)
288
+                    .multiply(BigDecimal.valueOf(100))
289
+                    .divide(BigDecimal.valueOf(total), 2, RoundingMode.HALF_UP);
290
+            return rate.min(BigDecimal.valueOf(100));
291
+        } catch (Exception e) {
292
+            log.debug("时效性评分计算失败({}): {}", tableName, e.getMessage());
293
+            return new BigDecimal("80");
294
+        }
295
+    }
296
+
297
+    private String buildDetail(BigDecimal completeness, BigDecimal accuracy, BigDecimal timeliness) {
298
+        return String.format(
299
+            "{\"completeness\":{\"score\":%s,\"weight\":0.4},\"accuracy\":{\"score\":%s,\"weight\":0.4},\"timeliness\":{\"score\":%s,\"weight\":0.2}}",
300
+            completeness, accuracy, timeliness);
301
+    }
302
+}

+ 89
- 0
wm-data-engine/src/main/resources/db/V_data_quality.sql 查看文件

@@ -0,0 +1,89 @@
1
+-- ============================================
2
+-- 数据质量模块 DDL
3
+-- V_data_quality.sql
4
+-- ============================================
5
+
6
+-- 1. 数据清洗任务表
7
+CREATE TABLE IF NOT EXISTS de_cleaning_task (
8
+    id              BIGSERIAL       PRIMARY KEY,
9
+    task_id         VARCHAR(32)     NOT NULL,
10
+    source_type     VARCHAR(32),
11
+    source_id       BIGINT,
12
+    total_records   INTEGER         DEFAULT 0,
13
+    cleaned_records INTEGER         DEFAULT 0,
14
+    filled_missing  INTEGER         DEFAULT 0,
15
+    removed_outliers INTEGER        DEFAULT 0,
16
+    removed_duplicates INTEGER      DEFAULT 0,
17
+    status          VARCHAR(16)     NOT NULL DEFAULT 'pending',
18
+    task_created_at TIMESTAMP,
19
+    finished_at     TIMESTAMP,
20
+    deleted         INTEGER         DEFAULT 0,
21
+    created_at      TIMESTAMP       DEFAULT NOW(),
22
+    updated_at      TIMESTAMP       DEFAULT NOW()
23
+);
24
+
25
+COMMENT ON TABLE de_cleaning_task IS '数据清洗任务表';
26
+COMMENT ON COLUMN de_cleaning_task.task_id IS '业务任务ID';
27
+COMMENT ON COLUMN de_cleaning_task.source_type IS '数据源类型: iot/manual/import/api';
28
+COMMENT ON COLUMN de_cleaning_task.status IS '任务状态: pending/running/completed/failed';
29
+
30
+-- 2. 数据清洗规则表
31
+CREATE TABLE IF NOT EXISTS de_cleaning_rule (
32
+    id              BIGSERIAL       PRIMARY KEY,
33
+    rule_name       VARCHAR(128)    NOT NULL,
34
+    rule_type       VARCHAR(32)     NOT NULL,
35
+    table_name      VARCHAR(128),
36
+    field_name      VARCHAR(128),
37
+    threshold       DECIMAL(12, 4),
38
+    enabled         INTEGER         DEFAULT 1,
39
+    deleted         INTEGER         DEFAULT 0,
40
+    created_at      TIMESTAMP       DEFAULT NOW(),
41
+    updated_at      TIMESTAMP       DEFAULT NOW()
42
+);
43
+
44
+COMMENT ON TABLE de_cleaning_rule IS '数据清洗规则表';
45
+COMMENT ON COLUMN de_cleaning_rule.rule_type IS '规则类型: missing_fill/outlier_detect/dedup/custom';
46
+
47
+-- 3. 数据质控评分表
48
+CREATE TABLE IF NOT EXISTS de_quality_score (
49
+    id                  BIGSERIAL       PRIMARY KEY,
50
+    source_type         VARCHAR(32),
51
+    source_id           BIGINT,
52
+    score_date          DATE            NOT NULL,
53
+    completeness_score  DECIMAL(6, 2),
54
+    accuracy_score      DECIMAL(6, 2),
55
+    timeliness_score    DECIMAL(6, 2),
56
+    total_score         DECIMAL(6, 2),
57
+    detail              TEXT,
58
+    scored_at           TIMESTAMP,
59
+    deleted             INTEGER         DEFAULT 0,
60
+    created_at          TIMESTAMP       DEFAULT NOW(),
61
+    updated_at          TIMESTAMP       DEFAULT NOW()
62
+);
63
+
64
+COMMENT ON TABLE de_quality_score IS '数据质控评分表';
65
+COMMENT ON COLUMN de_quality_score.source_type IS '数据源类型';
66
+COMMENT ON COLUMN de_quality_score.completeness_score IS '完整性得分(0-100)';
67
+COMMENT ON COLUMN de_quality_score.accuracy_score IS '准确性得分(0-100)';
68
+COMMENT ON COLUMN de_quality_score.timeliness_score IS '时效性得分(0-100)';
69
+COMMENT ON COLUMN de_quality_score.total_score IS '综合得分(0-100)';
70
+
71
+-- ============================================
72
+-- 索引
73
+-- ============================================
74
+
75
+-- 清洗任务索引
76
+CREATE INDEX idx_cleaning_task_task_id ON de_cleaning_task(task_id);
77
+CREATE INDEX idx_cleaning_task_source ON de_cleaning_task(source_type, source_id);
78
+CREATE INDEX idx_cleaning_task_status ON de_cleaning_task(status);
79
+CREATE INDEX idx_cleaning_task_created ON de_cleaning_task(task_created_at);
80
+
81
+-- 清洗规则索引
82
+CREATE INDEX idx_cleaning_rule_type ON de_cleaning_rule(rule_type);
83
+CREATE INDEX idx_cleaning_rule_table ON de_cleaning_rule(table_name);
84
+CREATE INDEX idx_cleaning_rule_enabled ON de_cleaning_rule(enabled);
85
+
86
+-- 质控评分索引
87
+CREATE INDEX idx_quality_score_source ON de_quality_score(source_type, source_id);
88
+CREATE INDEX idx_quality_score_date ON de_quality_score(score_date);
89
+CREATE INDEX idx_quality_score_total ON de_quality_score(total_score);

+ 286
- 0
wm-data-engine/src/test/java/com/water/data_engine/service/DataQualityTest.java 查看文件

@@ -0,0 +1,286 @@
1
+package com.water.data_engine.service;
2
+
3
+import com.water.data_engine.entity.CleaningRule;
4
+import com.water.data_engine.entity.CleaningTask;
5
+import com.water.data_engine.entity.QualityScore;
6
+import com.water.data_engine.mapper.CleaningRuleMapper;
7
+import com.water.data_engine.mapper.CleaningTaskMapper;
8
+import com.water.data_engine.mapper.QualityScoreMapper;
9
+import org.junit.jupiter.api.BeforeEach;
10
+import org.junit.jupiter.api.DisplayName;
11
+import org.junit.jupiter.api.Test;
12
+import org.junit.jupiter.api.extension.ExtendWith;
13
+import org.mockito.Mock;
14
+import org.mockito.junit.jupiter.MockitoExtension;
15
+import org.springframework.jdbc.core.JdbcTemplate;
16
+
17
+import java.math.BigDecimal;
18
+import java.time.LocalDate;
19
+import java.time.LocalDateTime;
20
+import java.util.*;
21
+
22
+import static org.junit.jupiter.api.Assertions.*;
23
+import static org.mockito.ArgumentMatchers.any;
24
+import static org.mockito.Mockito.*;
25
+
26
+/**
27
+ * 数据质量模块测试(清洗 + 质控评分)
28
+ */
29
+@ExtendWith(MockitoExtension.class)
30
+class DataQualityTest {
31
+
32
+    @Mock
33
+    private CleaningTaskMapper cleaningTaskMapper;
34
+    @Mock
35
+    private CleaningRuleMapper cleaningRuleMapper;
36
+    @Mock
37
+    private QualityScoreMapper qualityScoreMapper;
38
+    @Mock
39
+    private JdbcTemplate jdbcTemplate;
40
+
41
+    private DataCleaningService cleaningService;
42
+    private QualityScoringService scoringService;
43
+
44
+    @BeforeEach
45
+    void setUp() {
46
+        cleaningService = new DataCleaningService(cleaningTaskMapper, cleaningRuleMapper, jdbcTemplate);
47
+        scoringService = new QualityScoringService(qualityScoreMapper, jdbcTemplate);
48
+    }
49
+
50
+    // ==================== DataCleaningService 测试 ====================
51
+
52
+    @Test
53
+    @DisplayName("创建清洗任务-生成taskId并初始化状态")
54
+    void testCreateTask() {
55
+        when(cleaningTaskMapper.insert(any(CleaningTask.class))).thenReturn(1);
56
+
57
+        CleaningTask task = new CleaningTask();
58
+        task.setSourceType("iot");
59
+        task.setSourceId(1L);
60
+
61
+        CleaningTask result = cleaningService.createTask(task);
62
+
63
+        assertNotNull(result.getTaskId());
64
+        assertEquals(16, result.getTaskId().length());
65
+        assertEquals("pending", result.getStatus());
66
+        assertEquals(0, result.getTotalRecords());
67
+        assertEquals(0, result.getFilledMissing());
68
+        assertEquals(0, result.getRemovedOutliers());
69
+        assertEquals(0, result.getRemovedDuplicates());
70
+        assertNotNull(result.getTaskCreatedAt());
71
+        verify(cleaningTaskMapper).insert(any(CleaningTask.class));
72
+    }
73
+
74
+    @Test
75
+    @DisplayName("单条数据清洗-缺失值填充为-9999")
76
+    void testCleanData_MissingFill() {
77
+        Map<String, Object> data = new HashMap<>();
78
+        data.put("flow", null);
79
+        data.put("pressure", "");
80
+        data.put("level", 100.0);
81
+
82
+        Map<String, Object> result = cleaningService.cleanData(data, "range");
83
+
84
+        assertEquals(-9999.0, result.get("flow"));
85
+        assertEquals("MISSING_FILLED", result.get("flow_flag"));
86
+        assertEquals(-9999.0, result.get("pressure"));
87
+        assertEquals("MISSING_FILLED", result.get("pressure_flag"));
88
+        assertEquals(100.0, result.get("level"));
89
+        assertEquals(true, result.get("_cleaned"));
90
+        assertTrue((int) result.get("_filled_missing") >= 2);
91
+    }
92
+
93
+    @Test
94
+    @DisplayName("单条数据清洗-异常值检测(range模式)")
95
+    void testCleanData_OutlierRange() {
96
+        Map<String, Object> data = new HashMap<>();
97
+        data.put("flow", -5.0);     // 流量为负 → 异常
98
+        data.put("pressure", 300.0); // 压力超200 → 异常
99
+        data.put("ph", 7.0);        // 正常
100
+
101
+        Map<String, Object> result = cleaningService.cleanData(data, "range");
102
+
103
+        assertEquals("OUTLIER", result.get("flow_flag"));
104
+        assertEquals("OUTLIER", result.get("pressure_flag"));
105
+        assertNull(result.get("ph_flag")); // pH正常,无标记
106
+        assertTrue((int) result.get("_removed_outliers") >= 2);
107
+    }
108
+
109
+    @Test
110
+    @DisplayName("批量清洗-包含去重")
111
+    void testBatchClean_WithDedup() {
112
+        Map<String, Object> d1 = new HashMap<>();
113
+        d1.put("flow", 10.0);
114
+        Map<String, Object> d2 = new HashMap<>();
115
+        d2.put("flow", 10.0); // 与d1相同
116
+        Map<String, Object> d3 = new HashMap<>();
117
+        d3.put("flow", 20.0);
118
+
119
+        List<Map<String, Object>> dataList = List.of(d1, d2, d3);
120
+
121
+        Map<String, Object> result = cleaningService.batchClean(dataList, "range");
122
+
123
+        assertEquals(3, result.get("total_records"));
124
+        assertEquals(2, result.get("cleaned_records")); // 去重后2条
125
+        assertTrue((int) result.get("removed_duplicates") >= 1);
126
+        assertEquals(true, result.get("_batch_cleaned"));
127
+    }
128
+
129
+    @Test
130
+    @DisplayName("清洗规则-CRUD操作")
131
+    void testCleaningRuleCrud() {
132
+        when(cleaningRuleMapper.insert(any(CleaningRule.class))).thenReturn(1);
133
+        when(cleaningRuleMapper.selectById(1L)).thenReturn(buildRule());
134
+        when(cleaningRuleMapper.updateById(any(CleaningRule.class))).thenReturn(1);
135
+        when(cleaningRuleMapper.deleteById(1L)).thenReturn(1);
136
+
137
+        // 创建
138
+        CleaningRule rule = new CleaningRule();
139
+        rule.setRuleName("流量缺失填充");
140
+        rule.setRuleType("missing_fill");
141
+        rule.setTableName("iot_telemetry");
142
+        rule.setFieldName("flow");
143
+        CleaningRule created = cleaningService.createRule(rule);
144
+        assertEquals(1, created.getEnabled());
145
+
146
+        // 查询
147
+        CleaningRule fetched = cleaningService.getRule(1L);
148
+        assertNotNull(fetched);
149
+
150
+        // 更新
151
+        fetched.setThreshold(new BigDecimal("3.0"));
152
+        cleaningService.updateRule(1L, fetched);
153
+        verify(cleaningRuleMapper).updateById(any(CleaningRule.class));
154
+
155
+        // 删除
156
+        cleaningService.deleteRule(1L);
157
+        verify(cleaningRuleMapper).deleteById(1L);
158
+    }
159
+
160
+    @Test
161
+    @DisplayName("启用/禁用清洗规则")
162
+    void testToggleRule() {
163
+        CleaningRule rule = buildRule();
164
+        when(cleaningRuleMapper.selectById(1L)).thenReturn(rule);
165
+        when(cleaningRuleMapper.updateById(any(CleaningRule.class))).thenReturn(1);
166
+
167
+        CleaningRule disabled = cleaningService.toggleRule(1L, false);
168
+        assertEquals(0, disabled.getEnabled());
169
+
170
+        CleaningRule enabled = cleaningService.toggleRule(1L, true);
171
+        assertEquals(1, enabled.getEnabled());
172
+    }
173
+
174
+    // ==================== QualityScoringService 测试 ====================
175
+
176
+    @Test
177
+    @DisplayName("质控评分-三维度计算并保存")
178
+    void testScore() {
179
+        when(jdbcTemplate.queryForObject(anyString(), eq(Integer.class))).thenReturn(100);
180
+        when(qualityScoreMapper.insert(any(QualityScore.class))).thenReturn(1);
181
+
182
+        QualityScore score = scoringService.score("iot", 1L, "iot_telemetry");
183
+
184
+        assertNotNull(score);
185
+        assertEquals("iot", score.getSourceType());
186
+        assertEquals(1L, score.getSourceId());
187
+        assertEquals(LocalDate.now(), score.getScoreDate());
188
+        assertNotNull(score.getCompletenessScore());
189
+        assertNotNull(score.getAccuracyScore());
190
+        assertNotNull(score.getTimelinessScore());
191
+        assertNotNull(score.getTotalScore());
192
+        assertNotNull(score.getDetail());
193
+        assertTrue(score.getTotalScore().compareTo(BigDecimal.ZERO) >= 0);
194
+        assertTrue(score.getTotalScore().compareTo(new BigDecimal("100")) <= 0);
195
+        verify(qualityScoreMapper).insert(any(QualityScore.class));
196
+    }
197
+
198
+    @Test
199
+    @DisplayName("趋势分析-计算平均得分和趋势方向")
200
+    void testTrendAnalysis() {
201
+        List<QualityScore> mockScores = List.of(
202
+                buildScore(new BigDecimal("70"), LocalDate.now().minusDays(5)),
203
+                buildScore(new BigDecimal("75"), LocalDate.now().minusDays(3)),
204
+                buildScore(new BigDecimal("85"), LocalDate.now().minusDays(1))
205
+        );
206
+        when(qualityScoreMapper.selectList(any())).thenReturn(mockScores);
207
+
208
+        Map<String, Object> trend = scoringService.trendAnalysis("iot", 1L, 7);
209
+
210
+        assertNotNull(trend);
211
+        assertEquals("iot", trend.get("source_type"));
212
+        assertEquals(3, trend.get("data_points"));
213
+        assertEquals("improving", trend.get("trend_direction"));
214
+        assertNotNull(trend.get("avg_total_score"));
215
+        assertNotNull(trend.get("avg_completeness"));
216
+        assertNotNull(trend.get("avg_accuracy"));
217
+        assertNotNull(trend.get("avg_timeliness"));
218
+
219
+        @SuppressWarnings("unchecked")
220
+        List<Map<String, Object>> daily = (List<Map<String, Object>>) trend.get("daily_scores");
221
+        assertEquals(3, daily.size());
222
+    }
223
+
224
+    @Test
225
+    @DisplayName("质控概览-统计高质量数据占比")
226
+    void testOverview() {
227
+        List<QualityScore> mockScores = List.of(
228
+                buildScore(new BigDecimal("90"), LocalDate.now()),
229
+                buildScore(new BigDecimal("85"), LocalDate.now()),
230
+                buildScore(new BigDecimal("60"), LocalDate.now()),
231
+                buildScore(new BigDecimal("92"), LocalDate.now())
232
+        );
233
+        when(qualityScoreMapper.selectList(any())).thenReturn(mockScores);
234
+
235
+        Map<String, Object> overview = scoringService.overview();
236
+
237
+        assertNotNull(overview);
238
+        assertEquals(4, overview.get("total_records"));
239
+        assertNotNull(overview.get("avg_score"));
240
+        assertEquals(3L, overview.get("high_quality_count")); // >=80分的有3个
241
+        assertNotNull(overview.get("high_quality_rate"));
242
+    }
243
+
244
+    @Test
245
+    @DisplayName("质控评分列表-按日期范围查询")
246
+    void testListScores() {
247
+        List<QualityScore> mockScores = List.of(
248
+                buildScore(new BigDecimal("85"), LocalDate.now().minusDays(1)),
249
+                buildScore(new BigDecimal("90"), LocalDate.now())
250
+        );
251
+        when(qualityScoreMapper.selectList(any())).thenReturn(mockScores);
252
+
253
+        List<QualityScore> result = scoringService.listScores(
254
+                "iot", 1L, LocalDate.now().minusDays(7), LocalDate.now());
255
+
256
+        assertEquals(2, result.size());
257
+        assertEquals("iot", result.get(0).getSourceType());
258
+    }
259
+
260
+    // ==================== 辅助方法 ====================
261
+
262
+    private CleaningRule buildRule() {
263
+        CleaningRule rule = new CleaningRule();
264
+        rule.setId(1L);
265
+        rule.setRuleName("流量缺失填充");
266
+        rule.setRuleType("missing_fill");
267
+        rule.setTableName("iot_telemetry");
268
+        rule.setFieldName("flow");
269
+        rule.setThreshold(new BigDecimal("3.0"));
270
+        rule.setEnabled(1);
271
+        return rule;
272
+    }
273
+
274
+    private QualityScore buildScore(BigDecimal total, LocalDate date) {
275
+        QualityScore score = new QualityScore();
276
+        score.setSourceType("iot");
277
+        score.setSourceId(1L);
278
+        score.setScoreDate(date);
279
+        score.setCompletenessScore(new BigDecimal("88"));
280
+        score.setAccuracyScore(new BigDecimal("90"));
281
+        score.setTimelinessScore(new BigDecimal("80"));
282
+        score.setTotalScore(total);
283
+        score.setScoredAt(LocalDateTime.now());
284
+        return score;
285
+    }
286
+}