ソースを参照

Merge branch 'feature/issue-30'

bot_dev2 5 日 前
コミット
24f719858a

+ 5
- 0
wm-iot/pom.xml ファイルの表示

@@ -13,5 +13,10 @@
13 13
         <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>
14 14
         <dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId></dependency>
15 15
         <dependency><groupId>net.postgis</groupId><artifactId>postgis-jdbc</artifactId></dependency>
16
+        <dependency>
17
+            <groupId>org.springframework.boot</groupId>
18
+            <artifactId>spring-boot-starter-test</artifactId>
19
+            <scope>test</scope>
20
+        </dependency>
16 21
     </dependencies>
17 22
 </project>

+ 59
- 0
wm-iot/src/main/java/com/water/iot/config/RedisConfig.java ファイルの表示

@@ -0,0 +1,59 @@
1
+package com.water.iot.config;
2
+
3
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
4
+import com.fasterxml.jackson.annotation.PropertyAccessor;
5
+import com.fasterxml.jackson.databind.ObjectMapper;
6
+import com.fasterxml.jackson.databind.SerializationFeature;
7
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
8
+import org.springframework.context.annotation.Bean;
9
+import org.springframework.context.annotation.Configuration;
10
+import org.springframework.data.redis.connection.RedisConnectionFactory;
11
+import org.springframework.data.redis.core.RedisTemplate;
12
+import org.springframework.data.redis.core.StringRedisTemplate;
13
+import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
14
+import org.springframework.data.redis.serializer.StringRedisSerializer;
15
+
16
+/**
17
+ * Redis配置
18
+ * - StringRedisTemplate: key/value均为String序列化(用于设备影子Hash存储)
19
+ * - RedisTemplate<String, Object>: key为String,value为JSON序列化(通用)
20
+ */
21
+@Configuration
22
+public class RedisConfig {
23
+
24
+    @Bean
25
+    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {
26
+        StringRedisTemplate template = new StringRedisTemplate();
27
+        template.setConnectionFactory(factory);
28
+        return template;
29
+    }
30
+
31
+    @Bean
32
+    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {
33
+        RedisTemplate<String, Object> template = new RedisTemplate<>();
34
+        template.setConnectionFactory(factory);
35
+
36
+        // JSON序列化器
37
+        ObjectMapper mapper = new ObjectMapper();
38
+        mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
39
+        mapper.activateDefaultTyping(
40
+            mapper.getPolymorphicTypeValidator(),
41
+            ObjectMapper.DefaultTyping.NON_FINAL
42
+        );
43
+        mapper.registerModule(new JavaTimeModule());
44
+        mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
45
+
46
+        GenericJackson2JsonRedisSerializer jsonSerializer = new GenericJackson2JsonRedisSerializer(mapper);
47
+        StringRedisSerializer stringSerializer = new StringRedisSerializer();
48
+
49
+        // key序列化
50
+        template.setKeySerializer(stringSerializer);
51
+        template.setHashKeySerializer(stringSerializer);
52
+        // value序列化
53
+        template.setValueSerializer(jsonSerializer);
54
+        template.setHashValueSerializer(jsonSerializer);
55
+
56
+        template.afterPropertiesSet();
57
+        return template;
58
+    }
59
+}

+ 107
- 0
wm-iot/src/main/java/com/water/iot/controller/DeviceShadowController.java ファイルの表示

@@ -0,0 +1,107 @@
1
+package com.water.iot.controller;
2
+
3
+import com.water.common.core.result.R;
4
+import com.water.iot.entity.DeviceShadow;
5
+import com.water.iot.service.DeviceShadowService;
6
+import io.swagger.v3.oas.annotations.Operation;
7
+import io.swagger.v3.oas.annotations.tags.Tag;
8
+import lombok.RequiredArgsConstructor;
9
+import org.springframework.web.bind.annotation.*;
10
+
11
+import java.util.List;
12
+import java.util.Map;
13
+
14
+@Tag(name = "设备影子")
15
+@RestController
16
+@RequestMapping("/api/iot/shadow")
17
+@RequiredArgsConstructor
18
+public class DeviceShadowController {
19
+
20
+    private final DeviceShadowService shadowService;
21
+
22
+    @Operation(summary = "获取设备影子")
23
+    @GetMapping("/{deviceSn}")
24
+    public R<DeviceShadow> getShadow(@PathVariable String deviceSn) {
25
+        DeviceShadow shadow = shadowService.getShadow(deviceSn);
26
+        if (shadow == null) {
27
+            return R.fail(404, "设备影子不存在: " + deviceSn);
28
+        }
29
+        return R.ok(shadow);
30
+    }
31
+
32
+    @Operation(summary = "更新设备上报状态")
33
+    @PostMapping("/{deviceSn}/reported")
34
+    public R<String> updateReported(@PathVariable String deviceSn,
35
+                                     @RequestBody Map<String, Object> state) {
36
+        shadowService.updateReported(deviceSn, state);
37
+        return R.ok("上报状态已更新");
38
+    }
39
+
40
+    @Operation(summary = "更新期望状态(云端下发)")
41
+    @PostMapping("/{deviceSn}/desired")
42
+    public R<String> updateDesired(@PathVariable String deviceSn,
43
+                                    @RequestBody Map<String, Object> desired) {
44
+        try {
45
+            com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
46
+            String json = mapper.writeValueAsString(desired);
47
+            shadowService.updateDesired(deviceSn, json);
48
+            return R.ok("期望状态已下发");
49
+        } catch (Exception e) {
50
+            return R.fail("序列化失败: " + e.getMessage());
51
+        }
52
+    }
53
+
54
+    @Operation(summary = "获取上报状态")
55
+    @GetMapping("/{deviceSn}/reported")
56
+    public R<String> getReportedState(@PathVariable String deviceSn) {
57
+        String state = shadowService.getReportedState(deviceSn);
58
+        return R.ok(state);
59
+    }
60
+
61
+    @Operation(summary = "获取期望状态")
62
+    @GetMapping("/{deviceSn}/desired")
63
+    public R<String> getDesiredState(@PathVariable String deviceSn) {
64
+        String state = shadowService.getDesiredState(deviceSn);
65
+        return R.ok(state);
66
+    }
67
+
68
+    @Operation(summary = "获取差异状态(delta)")
69
+    @GetMapping("/{deviceSn}/delta")
70
+    public R<String> getDeltaState(@PathVariable String deviceSn) {
71
+        String state = shadowService.getDeltaState(deviceSn);
72
+        return R.ok(state);
73
+    }
74
+
75
+    @Operation(summary = "批量查询设备影子")
76
+    @PostMapping("/batch")
77
+    public R<List<DeviceShadow>> batchGetShadows(@RequestBody List<String> deviceSns) {
78
+        return R.ok(shadowService.batchGetShadows(deviceSns));
79
+    }
80
+
81
+    @Operation(summary = "检查设备在线状态")
82
+    @GetMapping("/{deviceSn}/online")
83
+    public R<Boolean> checkOnline(@PathVariable String deviceSn,
84
+                                   @RequestParam(defaultValue = "30") int thresholdMinutes) {
85
+        return R.ok(shadowService.checkOnline(deviceSn, thresholdMinutes));
86
+    }
87
+
88
+    @Operation(summary = "批量检测在线状态")
89
+    @PostMapping("/online/batch")
90
+    public R<Map<String, Boolean>> batchCheckOnline(@RequestBody List<String> deviceSns,
91
+                                                     @RequestParam(defaultValue = "30") int thresholdMinutes) {
92
+        return R.ok(shadowService.batchCheckOnline(deviceSns, thresholdMinutes));
93
+    }
94
+
95
+    @Operation(summary = "在线设备数量")
96
+    @GetMapping("/online/count")
97
+    public R<Long> countOnline() {
98
+        return R.ok(shadowService.countOnlineDevices());
99
+    }
100
+
101
+    @Operation(summary = "删除设备影子")
102
+    @DeleteMapping("/{deviceSn}")
103
+    public R<String> deleteShadow(@PathVariable String deviceSn) {
104
+        shadowService.deleteShadow(deviceSn);
105
+        return R.ok("影子已删除");
106
+    }
107
+}

+ 179
- 0
wm-iot/src/main/java/com/water/iot/controller/OtaController.java ファイルの表示

@@ -0,0 +1,179 @@
1
+package com.water.iot.controller;
2
+
3
+import com.water.common.core.result.R;
4
+import com.water.iot.entity.OtaFirmware;
5
+import com.water.iot.entity.OtaTask;
6
+import com.water.iot.entity.OtaUpgradeRecord;
7
+import com.water.iot.service.OtaService;
8
+import io.swagger.v3.oas.annotations.Operation;
9
+import io.swagger.v3.oas.annotations.tags.Tag;
10
+import lombok.RequiredArgsConstructor;
11
+import org.springframework.web.bind.annotation.*;
12
+
13
+import java.util.List;
14
+import java.util.Map;
15
+
16
+@Tag(name = "OTA固件升级")
17
+@RestController
18
+@RequestMapping("/api/iot/ota")
19
+@RequiredArgsConstructor
20
+public class OtaController {
21
+
22
+    private final OtaService otaService;
23
+
24
+    // ========== 固件管理 ==========
25
+
26
+    @Operation(summary = "创建固件版本")
27
+    @PostMapping("/firmware")
28
+    public R<OtaFirmware> createFirmware(@RequestBody OtaFirmware firmware) {
29
+        return R.ok(otaService.createFirmware(firmware));
30
+    }
31
+
32
+    @Operation(summary = "发布固件")
33
+    @PostMapping("/firmware/{id}/publish")
34
+    public R<String> publishFirmware(@PathVariable Long id,
35
+                                      @RequestParam String publishedBy) {
36
+        otaService.publishFirmware(id, publishedBy);
37
+        return R.ok("固件已发布");
38
+    }
39
+
40
+    @Operation(summary = "废弃固件")
41
+    @PostMapping("/firmware/{id}/deprecate")
42
+    public R<String> deprecateFirmware(@PathVariable Long id) {
43
+        otaService.deprecateFirmware(id);
44
+        return R.ok("固件已废弃");
45
+    }
46
+
47
+    @Operation(summary = "查询固件详情")
48
+    @GetMapping("/firmware/{id}")
49
+    public R<OtaFirmware> getFirmware(@PathVariable Long id) {
50
+        OtaFirmware firmware = otaService.getFirmware(id);
51
+        if (firmware == null) {
52
+            return R.fail(404, "固件不存在: " + id);
53
+        }
54
+        return R.ok(firmware);
55
+    }
56
+
57
+    @Operation(summary = "按模型查询固件列表")
58
+    @GetMapping("/firmware/model/{modelId}")
59
+    public R<List<OtaFirmware>> listByModel(@PathVariable Long modelId) {
60
+        return R.ok(otaService.listFirmwareByModel(modelId));
61
+    }
62
+
63
+    @Operation(summary = "查询所有固件(分页)")
64
+    @GetMapping("/firmware")
65
+    public R<List<OtaFirmware>> listAllFirmware(@RequestParam(defaultValue = "1") int page,
66
+                                                 @RequestParam(defaultValue = "10") int size) {
67
+        return R.ok(otaService.listAllFirmware(page, size));
68
+    }
69
+
70
+    @Operation(summary = "设备查询可用固件")
71
+    @GetMapping("/firmware/available/{deviceSn}")
72
+    public R<OtaFirmware> getAvailableFirmware(@PathVariable String deviceSn) {
73
+        OtaFirmware firmware = otaService.getAvailableFirmware(deviceSn);
74
+        return R.ok(firmware);
75
+    }
76
+
77
+    // ========== 升级任务管理 ==========
78
+
79
+    @Operation(summary = "创建升级任务(指定设备列表)")
80
+    @PostMapping("/task")
81
+    public R<OtaTask> createUpgradeTask(@RequestBody Map<String, Object> body) {
82
+        Long firmwareId = ((Number) body.get("firmwareId")).longValue();
83
+        @SuppressWarnings("unchecked")
84
+        List<Number> deviceIdNums = (List<Number>) body.get("deviceIds");
85
+        List<Long> deviceIds = deviceIdNums.stream().map(Number::longValue).toList();
86
+        int batchSize = body.containsKey("batchSize") ? ((Number) body.get("batchSize")).intValue() : 10;
87
+        String createdBy = (String) body.getOrDefault("createdBy", "system");
88
+
89
+        return R.ok(otaService.createUpgradeTask(firmwareId, deviceIds, batchSize, createdBy));
90
+    }
91
+
92
+    @Operation(summary = "按条件创建升级任务")
93
+    @PostMapping("/task/filter")
94
+    public R<OtaTask> createTaskByFilter(@RequestBody Map<String, Object> body) {
95
+        Long firmwareId = ((Number) body.get("firmwareId")).longValue();
96
+        String deviceType = (String) body.get("deviceType");
97
+        String area = (String) body.get("area");
98
+        int batchSize = body.containsKey("batchSize") ? ((Number) body.get("batchSize")).intValue() : 10;
99
+        String createdBy = (String) body.getOrDefault("createdBy", "system");
100
+
101
+        return R.ok(otaService.createUpgradeTaskByFilter(firmwareId, deviceType, area, batchSize, createdBy));
102
+    }
103
+
104
+    @Operation(summary = "启动升级任务")
105
+    @PostMapping("/task/{id}/start")
106
+    public R<String> startTask(@PathVariable Long id) {
107
+        otaService.startTask(id);
108
+        return R.ok("任务已启动");
109
+    }
110
+
111
+    @Operation(summary = "取消升级任务")
112
+    @PostMapping("/task/{id}/cancel")
113
+    public R<String> cancelTask(@PathVariable Long id) {
114
+        otaService.cancelTask(id);
115
+        return R.ok("任务已取消");
116
+    }
117
+
118
+    @Operation(summary = "查询升级任务详情")
119
+    @GetMapping("/task/{id}")
120
+    public R<OtaTask> getTask(@PathVariable Long id) {
121
+        OtaTask task = otaService.getTask(id);
122
+        if (task == null) {
123
+            return R.fail(404, "任务不存在: " + id);
124
+        }
125
+        return R.ok(task);
126
+    }
127
+
128
+    @Operation(summary = "查询升级任务列表")
129
+    @GetMapping("/task")
130
+    public R<List<OtaTask>> listTasks(@RequestParam(defaultValue = "1") int page,
131
+                                       @RequestParam(defaultValue = "10") int size) {
132
+        return R.ok(otaService.listTasks(page, size));
133
+    }
134
+
135
+    // ========== 进度追踪 ==========
136
+
137
+    @Operation(summary = "更新升级进度")
138
+    @PostMapping("/progress/{recordId}")
139
+    public R<String> updateProgress(@PathVariable Long recordId,
140
+                                     @RequestParam int progress) {
141
+        otaService.updateProgress(recordId, progress);
142
+        return R.ok("进度已更新");
143
+    }
144
+
145
+    @Operation(summary = "标记升级成功")
146
+    @PostMapping("/record/{recordId}/success")
147
+    public R<String> markSuccess(@PathVariable Long recordId,
148
+                                  @RequestParam Long deviceId,
149
+                                  @RequestParam String toVersion) {
150
+        otaService.markSuccess(recordId, deviceId, toVersion);
151
+        return R.ok("已标记成功");
152
+    }
153
+
154
+    @Operation(summary = "标记升级失败")
155
+    @PostMapping("/record/{recordId}/fail")
156
+    public R<String> markFailed(@PathVariable Long recordId,
157
+                                 @RequestParam String reason) {
158
+        otaService.markFailed(recordId, reason);
159
+        return R.ok("已标记失败");
160
+    }
161
+
162
+    @Operation(summary = "查询任务统计")
163
+    @GetMapping("/task/{id}/statistics")
164
+    public R<Map<String, Object>> getTaskStatistics(@PathVariable Long id) {
165
+        return R.ok(otaService.getTaskStatistics(id));
166
+    }
167
+
168
+    @Operation(summary = "查询任务升级记录")
169
+    @GetMapping("/task/{id}/records")
170
+    public R<List<OtaUpgradeRecord>> getTaskRecords(@PathVariable Long id) {
171
+        return R.ok(otaService.getTaskRecords(id));
172
+    }
173
+
174
+    @Operation(summary = "查询设备升级历史")
175
+    @GetMapping("/device/{deviceSn}/history")
176
+    public R<List<OtaUpgradeRecord>> getDeviceHistory(@PathVariable String deviceSn) {
177
+        return R.ok(otaService.getDeviceUpgradeHistory(deviceSn));
178
+    }
179
+}

+ 40
- 0
wm-iot/src/main/java/com/water/iot/entity/DeviceShadow.java ファイルの表示

@@ -0,0 +1,40 @@
1
+package com.water.iot.entity;
2
+
3
+import lombok.Data;
4
+import java.time.LocalDateTime;
5
+
6
+/**
7
+ * 设备影子实体
8
+ * Redis Hash存储:key=shadow:{deviceSn}
9
+ * fields: reported(JSON), desired(JSON), lastReportTime, online
10
+ */
11
+@Data
12
+public class DeviceShadow {
13
+
14
+    /** 设备SN(主键) */
15
+    private String deviceSn;
16
+
17
+    /** 设备ID */
18
+    private Long deviceId;
19
+
20
+    /** 设备名称 */
21
+    private String deviceName;
22
+
23
+    /** 上报状态JSON */
24
+    private String reportedState;
25
+
26
+    /** 期望状态JSON(云端下发) */
27
+    private String desiredState;
28
+
29
+    /** 差异状态JSON */
30
+    private String deltaState;
31
+
32
+    /** 最后上报时间 */
33
+    private LocalDateTime lastReportTime;
34
+
35
+    /** 在线状态 */
36
+    private Boolean online;
37
+
38
+    /** 影子版本号 */
39
+    private Long version;
40
+}

+ 44
- 0
wm-iot/src/main/java/com/water/iot/entity/OtaFirmware.java ファイルの表示

@@ -0,0 +1,44 @@
1
+package com.water.iot.entity;
2
+
3
+import lombok.Data;
4
+import java.time.LocalDateTime;
5
+
6
+/**
7
+ * OTA固件版本实体
8
+ */
9
+@Data
10
+public class OtaFirmware {
11
+
12
+    /** 固件ID */
13
+    private Long id;
14
+
15
+    /** 设备模型ID */
16
+    private Long modelId;
17
+
18
+    /** 固件版本号 */
19
+    private String firmwareVersion;
20
+
21
+    /** 固件文件URL */
22
+    private String fileUrl;
23
+
24
+    /** 固件描述 */
25
+    private String description;
26
+
27
+    /** 状态: draft/published/deprecated */
28
+    private String status;
29
+
30
+    /** 文件MD5 */
31
+    private String md5;
32
+
33
+    /** 文件大小(bytes) */
34
+    private Long fileSize;
35
+
36
+    /** 发布人 */
37
+    private String publishedBy;
38
+
39
+    /** 发布时间 */
40
+    private LocalDateTime publishedAt;
41
+
42
+    private LocalDateTime createdAt;
43
+    private LocalDateTime updatedAt;
44
+}

+ 59
- 0
wm-iot/src/main/java/com/water/iot/entity/OtaTask.java ファイルの表示

@@ -0,0 +1,59 @@
1
+package com.water.iot.entity;
2
+
3
+import lombok.Data;
4
+import java.time.LocalDateTime;
5
+
6
+/**
7
+ * OTA升级任务实体
8
+ */
9
+@Data
10
+public class OtaTask {
11
+
12
+    /** 任务ID */
13
+    private Long id;
14
+
15
+    /** 固件ID */
16
+    private Long firmwareId;
17
+
18
+    /** 固件版本号 */
19
+    private String firmwareVersion;
20
+
21
+    /** 目标设备ID列表(JSON) */
22
+    private String targetDeviceIds;
23
+
24
+    /** 任务状态: pending/executing/completed/cancelled */
25
+    private String taskStatus;
26
+
27
+    /** 每批大小 */
28
+    private Integer batchSize;
29
+
30
+    /** 目标设备总数 */
31
+    private Integer totalDevices;
32
+
33
+    /** 成功数 */
34
+    private Integer successCount;
35
+
36
+    /** 失败数 */
37
+    private Integer failedCount;
38
+
39
+    /** 执行中数量 */
40
+    private Integer executingCount;
41
+
42
+    /** 目标设备类型(可选) */
43
+    private String targetType;
44
+
45
+    /** 目标区域(可选) */
46
+    private String targetArea;
47
+
48
+    /** 创建人 */
49
+    private String createdBy;
50
+
51
+    /** 执行开始时间 */
52
+    private LocalDateTime executedAt;
53
+
54
+    /** 完成时间 */
55
+    private LocalDateTime completedAt;
56
+
57
+    private LocalDateTime createdAt;
58
+    private LocalDateTime updatedAt;
59
+}

+ 46
- 0
wm-iot/src/main/java/com/water/iot/entity/OtaUpgradeRecord.java ファイルの表示

@@ -0,0 +1,46 @@
1
+package com.water.iot.entity;
2
+
3
+import lombok.Data;
4
+import java.time.LocalDateTime;
5
+
6
+/**
7
+ * OTA升级记录实体(每台设备一条)
8
+ */
9
+@Data
10
+public class OtaUpgradeRecord {
11
+
12
+    /** 记录ID */
13
+    private Long id;
14
+
15
+    /** 任务ID */
16
+    private Long taskId;
17
+
18
+    /** 设备ID */
19
+    private Long deviceId;
20
+
21
+    /** 设备SN */
22
+    private String deviceSn;
23
+
24
+    /** 升级前版本 */
25
+    private String fromVersion;
26
+
27
+    /** 升级目标版本 */
28
+    private String toVersion;
29
+
30
+    /** 状态: pending/executing/success/failed */
31
+    private String status;
32
+
33
+    /** 进度(0-100) */
34
+    private Integer progress;
35
+
36
+    /** 失败原因 */
37
+    private String failReason;
38
+
39
+    /** 开始时间 */
40
+    private LocalDateTime startedAt;
41
+
42
+    /** 完成时间 */
43
+    private LocalDateTime completedAt;
44
+
45
+    private LocalDateTime createdAt;
46
+}

+ 244
- 15
wm-iot/src/main/java/com/water/iot/service/DeviceShadowService.java ファイルの表示

@@ -2,15 +2,25 @@ package com.water.iot.service;
2 2
 
3 3
 import com.fasterxml.jackson.core.JsonProcessingException;
4 4
 import com.fasterxml.jackson.databind.ObjectMapper;
5
+import com.water.iot.entity.DeviceShadow;
5 6
 import lombok.RequiredArgsConstructor;
6 7
 import lombok.extern.slf4j.Slf4j;
7 8
 import org.springframework.data.redis.core.StringRedisTemplate;
9
+import org.springframework.jdbc.core.BeanPropertyRowMapper;
8 10
 import org.springframework.jdbc.core.JdbcTemplate;
9 11
 import org.springframework.stereotype.Service;
10 12
 
11
-import java.util.Map;
13
+import java.time.LocalDateTime;
14
+import java.time.temporal.ChronoUnit;
15
+import java.util.*;
12 16
 import java.util.concurrent.TimeUnit;
17
+import java.util.stream.Collectors;
13 18
 
19
+/**
20
+ * 设备影子服务
21
+ * Redis Hash存储:key=shadow:{deviceSn},TTL 24h
22
+ * fields: reported(JSON), desired(JSON), lastReportTime, online, version
23
+ */
14 24
 @Slf4j
15 25
 @Service
16 26
 @RequiredArgsConstructor
@@ -18,32 +28,251 @@ public class DeviceShadowService {
18 28
 
19 29
     private final StringRedisTemplate redisTemplate;
20 30
     private final JdbcTemplate jdbcTemplate;
21
-    private final ObjectMapper mapper = new ObjectMapper();
31
+    private final ObjectMapper objectMapper;
22 32
 
23
-    private static final String SHADOW_PREFIX = "iot:shadow:";
33
+    private static final String SHADOW_PREFIX = "shadow:";
24 34
     private static final long SHADOW_TTL_HOURS = 24;
25 35
 
26
-    /** 更新设备上报状态 */
36
+    /**
37
+     * 更新设备上报状态
38
+     * @param deviceSn 设备SN
39
+     * @param state    上报的状态数据(Map)
40
+     */
27 41
     public void updateReported(String deviceSn, Map<String, Object> state) {
28 42
         try {
29 43
             String key = SHADOW_PREFIX + deviceSn;
30
-            String json = mapper.writeValueAsString(state);
31
-            redisTemplate.opsForHash().put(key, "reported", json);
44
+            String json = objectMapper.writeValueAsString(state);
45
+
46
+            Map<String, String> fields = new HashMap<>();
47
+            fields.put("reported", json);
48
+            fields.put("lastReportTime", LocalDateTime.now().toString());
49
+            fields.put("online", "true");
50
+
51
+            // 递增版本号
52
+            Long version = redisTemplate.opsForHash().increment(key, "version", 1);
53
+            fields.put("version", String.valueOf(version));
54
+
55
+            redisTemplate.opsForHash().putAll(key, fields);
32 56
             redisTemplate.expire(key, SHADOW_TTL_HOURS, TimeUnit.HOURS);
33
-            // 同步更新数据库设备最后上报时间
34
-            jdbcTemplate.update("UPDATE iot_device SET last_report_time = NOW() WHERE device_sn = ?", deviceSn);
57
+
58
+            // 计算delta(期望与上报的差异)
59
+            computeDelta(deviceSn);
60
+
61
+            // 同步更新数据库设备最后上报时间和在线状态
62
+            jdbcTemplate.update(
63
+                "UPDATE iot_device SET last_report_time = NOW(), status = 'online' WHERE device_sn = ?",
64
+                deviceSn
65
+            );
66
+
67
+            log.debug("Shadow reported updated: deviceSn={}, version={}", deviceSn, version);
35 68
         } catch (JsonProcessingException e) {
36
-            log.error("Shadow update error: {}", e.getMessage());
69
+            log.error("Shadow update error for device {}: {}", deviceSn, e.getMessage());
37 70
         }
38 71
     }
39 72
 
40
-    /** 获取设备影子 */
41
-    public Map<Object, Object> getShadow(String deviceSn) {
42
-        return redisTemplate.opsForHash().entries(SHADOW_PREFIX + deviceSn);
73
+    /**
74
+     * 更新期望状态(云端→设备)
75
+     * @param deviceSn    设备SN
76
+     * @param desiredJson 期望状态JSON字符串
77
+     */
78
+    public void updateDesired(String deviceSn, String desiredJson) {
79
+        String key = SHADOW_PREFIX + deviceSn;
80
+        Map<String, String> fields = new HashMap<>();
81
+        fields.put("desired", desiredJson);
82
+        Long version = redisTemplate.opsForHash().increment(key, "version", 1);
83
+        fields.put("version", String.valueOf(version));
84
+
85
+        redisTemplate.opsForHash().putAll(key, fields);
86
+        redisTemplate.expire(key, SHADOW_TTL_HOURS, TimeUnit.HOURS);
87
+
88
+        computeDelta(deviceSn);
89
+        log.debug("Shadow desired updated: deviceSn={}, version={}", deviceSn, version);
90
+    }
91
+
92
+    /**
93
+     * 获取设备影子完整数据
94
+     */
95
+    public DeviceShadow getShadow(String deviceSn) {
96
+        String key = SHADOW_PREFIX + deviceSn;
97
+        Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);
98
+        if (entries.isEmpty()) {
99
+            return null;
100
+        }
101
+        return mapToShadow(deviceSn, entries);
43 102
     }
44 103
 
45
-    /** 更新期望状态(云端→设备) */
46
-    public void updateDesired(String deviceSn, String desiredJson) {
47
-        redisTemplate.opsForHash().put(SHADOW_PREFIX + deviceSn, "desired", desiredJson);
104
+    /**
105
+     * 获取设备上报状态
106
+     */
107
+    public String getReportedState(String deviceSn) {
108
+        Object reported = redisTemplate.opsForHash().get(SHADOW_PREFIX + deviceSn, "reported");
109
+        return reported != null ? reported.toString() : null;
110
+    }
111
+
112
+    /**
113
+     * 获取设备期望状态
114
+     */
115
+    public String getDesiredState(String deviceSn) {
116
+        Object desired = redisTemplate.opsForHash().get(SHADOW_PREFIX + deviceSn, "desired");
117
+        return desired != null ? desired.toString() : null;
118
+    }
119
+
120
+    /**
121
+     * 获取差异状态(delta = desired - reported)
122
+     */
123
+    public String getDeltaState(String deviceSn) {
124
+        Object delta = redisTemplate.opsForHash().get(SHADOW_PREFIX + deviceSn, "delta");
125
+        return delta != null ? delta.toString() : null;
126
+    }
127
+
128
+    /**
129
+     * 批量查询设备影子
130
+     * @param deviceSns 设备SN列表
131
+     */
132
+    public List<DeviceShadow> batchGetShadows(List<String> deviceSns) {
133
+        if (deviceSns == null || deviceSns.isEmpty()) {
134
+            return Collections.emptyList();
135
+        }
136
+        return deviceSns.stream()
137
+            .map(this::getShadow)
138
+            .filter(Objects::nonNull)
139
+            .collect(Collectors.toList());
140
+    }
141
+
142
+    /**
143
+     * 离线检测:检查指定设备是否在线(基于Redis TTL)
144
+     * @param deviceSn          设备SN
145
+     * @param offlineThresholdMinutes 离线阈值(分钟),超过该时间无上报视为离线
146
+     * @return true=在线, false=离线
147
+     */
148
+    public boolean checkOnline(String deviceSn, int offlineThresholdMinutes) {
149
+        String key = SHADOW_PREFIX + deviceSn;
150
+        Object lastReportTimeStr = redisTemplate.opsForHash().get(key, "lastReportTime");
151
+
152
+        if (lastReportTimeStr == null) {
153
+            // 无上报记录,查DB
154
+            List<LocalDateTime> dbTimes = jdbcTemplate.queryForList(
155
+                "SELECT last_report_time FROM iot_device WHERE device_sn = ?",
156
+                LocalDateTime.class, deviceSn
157
+            );
158
+            if (dbTimes.isEmpty() || dbTimes.get(0) == null) return false;
159
+            lastReportTimeStr = dbTimes.get(0).toString();
160
+        }
161
+
162
+        LocalDateTime lastReport = LocalDateTime.parse(lastReportTimeStr.toString());
163
+        long minutesSinceReport = ChronoUnit.MINUTES.between(lastReport, LocalDateTime.now());
164
+
165
+        boolean isOnline = minutesSinceReport <= offlineThresholdMinutes;
166
+
167
+        // 更新Redis在线状态
168
+        redisTemplate.opsForHash().put(key, "online", String.valueOf(isOnline));
169
+
170
+        // 同步到DB
171
+        String dbStatus = isOnline ? "online" : "offline";
172
+        jdbcTemplate.update("UPDATE iot_device SET status = ? WHERE device_sn = ?", dbStatus, deviceSn);
173
+
174
+        return isOnline;
175
+    }
176
+
177
+    /**
178
+     * 批量离线检测(使用默认阈值30分钟)
179
+     */
180
+    public Map<String, Boolean> batchCheckOnline(List<String> deviceSns) {
181
+        return batchCheckOnline(deviceSns, 30);
182
+    }
183
+
184
+    /**
185
+     * 批量离线检测
186
+     */
187
+    public Map<String, Boolean> batchCheckOnline(List<String> deviceSns, int offlineThresholdMinutes) {
188
+        Map<String, Boolean> result = new LinkedHashMap<>();
189
+        for (String sn : deviceSns) {
190
+            result.put(sn, checkOnline(sn, offlineThresholdMinutes));
191
+        }
192
+        return result;
193
+    }
194
+
195
+    /**
196
+     * 删除设备影子(设备注销时)
197
+     */
198
+    public void deleteShadow(String deviceSn) {
199
+        redisTemplate.delete(SHADOW_PREFIX + deviceSn);
200
+    }
201
+
202
+    /**
203
+     * 查询在线设备数量(Redis中有记录且online=true)
204
+     */
205
+    public long countOnlineDevices() {
206
+        // 使用DB统计更可靠,这里提供Redis快速统计
207
+        List<Map<String, Object>> result = jdbcTemplate.queryForList(
208
+            "SELECT COUNT(*) as cnt FROM iot_device WHERE status = 'online'"
209
+        );
210
+        return result.isEmpty() ? 0 : ((Number) result.get(0).get("cnt")).longValue();
211
+    }
212
+
213
+    // ===== 私有方法 =====
214
+
215
+    /**
216
+     * 计算delta状态(期望与上报的差异)
217
+     */
218
+    @SuppressWarnings("unchecked")
219
+    private void computeDelta(String deviceSn) {
220
+        String key = SHADOW_PREFIX + deviceSn;
221
+        try {
222
+            Object reportedObj = redisTemplate.opsForHash().get(key, "reported");
223
+            Object desiredObj = redisTemplate.opsForHash().get(key, "desired");
224
+
225
+            if (desiredObj == null) {
226
+                // 无期望状态,delta为空
227
+                redisTemplate.opsForHash().put(key, "delta", "{}");
228
+                return;
229
+            }
230
+
231
+            Map<String, Object> reported = reportedObj != null ?
232
+                objectMapper.readValue(reportedObj.toString(), Map.class) : Collections.emptyMap();
233
+            Map<String, Object> desired = objectMapper.readValue(desiredObj.toString(), Map.class);
234
+
235
+            // delta = desired中不同于reported的字段
236
+            Map<String, Object> delta = new LinkedHashMap<>();
237
+            for (Map.Entry<String, Object> entry : desired.entrySet()) {
238
+                Object reportedVal = reported.get(entry.getKey());
239
+                if (!Objects.equals(entry.getValue(), reportedVal)) {
240
+                    delta.put(entry.getKey(), entry.getValue());
241
+                }
242
+            }
243
+
244
+            String deltaJson = objectMapper.writeValueAsString(delta);
245
+            redisTemplate.opsForHash().put(key, "delta", deltaJson);
246
+        } catch (Exception e) {
247
+            log.warn("Delta compute error for device {}: {}", deviceSn, e.getMessage());
248
+        }
249
+    }
250
+
251
+    /**
252
+     * Redis Hash → DeviceShadow
253
+     */
254
+    private DeviceShadow mapToShadow(String deviceSn, Map<Object, Object> entries) {
255
+        DeviceShadow shadow = new DeviceShadow();
256
+        shadow.setDeviceSn(deviceSn);
257
+        shadow.setReportedState(entries.get("reported") != null ? entries.get("reported").toString() : null);
258
+        shadow.setDesiredState(entries.get("desired") != null ? entries.get("desired").toString() : null);
259
+        shadow.setDeltaState(entries.get("delta") != null ? entries.get("delta").toString() : null);
260
+        shadow.setOnline(entries.get("online") != null && "true".equals(entries.get("online").toString()));
261
+        shadow.setVersion(entries.get("version") != null ? Long.parseLong(entries.get("version").toString()) : 0L);
262
+
263
+        if (entries.get("lastReportTime") != null) {
264
+            shadow.setLastReportTime(LocalDateTime.parse(entries.get("lastReportTime").toString()));
265
+        }
266
+
267
+        // 从DB补充设备信息
268
+        List<Map<String, Object>> deviceInfo = jdbcTemplate.queryForList(
269
+            "SELECT id, device_name FROM iot_device WHERE device_sn = ?", deviceSn
270
+        );
271
+        if (!deviceInfo.isEmpty()) {
272
+            shadow.setDeviceId(((Number) deviceInfo.get(0).get("id")).longValue());
273
+            shadow.setDeviceName((String) deviceInfo.get(0).get("device_name"));
274
+        }
275
+
276
+        return shadow;
48 277
     }
49 278
 }

+ 485
- 13
wm-iot/src/main/java/com/water/iot/service/OtaService.java ファイルの表示

@@ -1,12 +1,30 @@
1 1
 package com.water.iot.service;
2 2
 
3
+import com.fasterxml.jackson.core.JsonProcessingException;
4
+import com.fasterxml.jackson.core.type.TypeReference;
5
+import com.fasterxml.jackson.databind.ObjectMapper;
6
+import com.water.iot.entity.OtaFirmware;
7
+import com.water.iot.entity.OtaTask;
8
+import com.water.iot.entity.OtaUpgradeRecord;
3 9
 import lombok.RequiredArgsConstructor;
4 10
 import lombok.extern.slf4j.Slf4j;
11
+import org.springframework.jdbc.core.BeanPropertyRowMapper;
5 12
 import org.springframework.jdbc.core.JdbcTemplate;
13
+import org.springframework.jdbc.support.GeneratedKeyHolder;
14
+import org.springframework.jdbc.support.KeyHolder;
6 15
 import org.springframework.stereotype.Service;
16
+import org.springframework.transaction.annotation.Transactional;
7 17
 
8
-import java.util.Map;
18
+import java.sql.PreparedStatement;
19
+import java.sql.Statement;
20
+import java.time.LocalDateTime;
21
+import java.util.*;
22
+import java.util.stream.Collectors;
9 23
 
24
+/**
25
+ * OTA固件升级服务
26
+ * 支持固件上传/版本管理/升级任务创建(按批次)/进度追踪/结果统计/设备查询可用固件
27
+ */
10 28
 @Slf4j
11 29
 @Service
12 30
 @RequiredArgsConstructor
@@ -14,21 +32,475 @@ public class OtaService {
14 32
 
15 33
     private final JdbcTemplate jdbcTemplate;
16 34
     private final DeviceShadowService shadowService;
35
+    private final ObjectMapper objectMapper;
17 36
 
18
-    /** 创建 OTA 升级任务 */
19
-    public void createUpgrade(Long modelId, String firmwareVersion, String firmwareUrl, String checkMd5) {
37
+    // ========== 固件管理 ==========
38
+
39
+    /**
40
+     * 创建固件版本
41
+     */
42
+    @Transactional
43
+    public OtaFirmware createFirmware(OtaFirmware firmware) {
44
+        firmware.setStatus("draft");
45
+        firmware.setCreatedAt(LocalDateTime.now());
46
+        firmware.setUpdatedAt(LocalDateTime.now());
47
+
48
+        KeyHolder keyHolder = new GeneratedKeyHolder();
49
+        jdbcTemplate.update(connection -> {
50
+            var ps = connection.prepareStatement(
51
+                "INSERT INTO iot_ota_firmware (model_id, firmware_version, file_url, description, status, md5, file_size, created_at, updated_at) " +
52
+                "VALUES (?, ?, ?, ?, ?, ?, ?, NOW(), NOW())",
53
+                Statement.RETURN_GENERATED_KEYS
54
+            );
55
+            ps.setLong(1, firmware.getModelId());
56
+            ps.setString(2, firmware.getFirmwareVersion());
57
+            ps.setString(3, firmware.getFileUrl());
58
+            ps.setString(4, firmware.getDescription());
59
+            ps.setString(5, firmware.getStatus());
60
+            ps.setString(6, firmware.getMd5());
61
+            ps.setLong(7, firmware.getFileSize() != null ? firmware.getFileSize() : 0);
62
+            return ps;
63
+        }, keyHolder);
64
+
65
+        firmware.setId(keyHolder.getKey().longValue());
66
+        log.info("Firmware created: id={}, version={}", firmware.getId(), firmware.getFirmwareVersion());
67
+        return firmware;
68
+    }
69
+
70
+    /**
71
+     * 发布固件
72
+     */
73
+    @Transactional
74
+    public void publishFirmware(Long firmwareId, String publishedBy) {
75
+        int updated = jdbcTemplate.update(
76
+            "UPDATE iot_ota_firmware SET status = 'published', published_by = ?, published_at = NOW(), updated_at = NOW() WHERE id = ?",
77
+            publishedBy, firmwareId
78
+        );
79
+        if (updated == 0) throw new RuntimeException("固件不存在: " + firmwareId);
80
+        log.info("Firmware {} published by {}", firmwareId, publishedBy);
81
+    }
82
+
83
+    /**
84
+     * 废弃固件
85
+     */
86
+    @Transactional
87
+    public void deprecateFirmware(Long firmwareId) {
88
+        jdbcTemplate.update(
89
+            "UPDATE iot_ota_firmware SET status = 'deprecated', updated_at = NOW() WHERE id = ?",
90
+            firmwareId
91
+        );
92
+    }
93
+
94
+    /**
95
+     * 查询固件详情
96
+     */
97
+    public OtaFirmware getFirmware(Long firmwareId) {
98
+        List<OtaFirmware> list = jdbcTemplate.query(
99
+            "SELECT * FROM iot_ota_firmware WHERE id = ?",
100
+            new BeanPropertyRowMapper<>(OtaFirmware.class),
101
+            firmwareId
102
+        );
103
+        return list.isEmpty() ? null : list.get(0);
104
+    }
105
+
106
+    /**
107
+     * 按模型查询固件版本列表
108
+     */
109
+    public List<OtaFirmware> listFirmwareByModel(Long modelId) {
110
+        return jdbcTemplate.query(
111
+            "SELECT * FROM iot_ota_firmware WHERE model_id = ? ORDER BY created_at DESC",
112
+            new BeanPropertyRowMapper<>(OtaFirmware.class),
113
+            modelId
114
+        );
115
+    }
116
+
117
+    /**
118
+     * 查询所有固件(分页)
119
+     */
120
+    public List<OtaFirmware> listAllFirmware(int page, int size) {
121
+        int offset = (page - 1) * size;
122
+        return jdbcTemplate.query(
123
+            "SELECT * FROM iot_ota_firmware ORDER BY created_at DESC LIMIT ? OFFSET ?",
124
+            new BeanPropertyRowMapper<>(OtaFirmware.class),
125
+            size, offset
126
+        );
127
+    }
128
+
129
+    /**
130
+     * 设备查询可用固件(基于设备SN查找最新发布的固件)
131
+     */
132
+    public OtaFirmware getAvailableFirmware(String deviceSn) {
133
+        // 查找设备对应的模型
134
+        List<Long> modelIds = jdbcTemplate.queryForList(
135
+            "SELECT model_id FROM iot_device WHERE device_sn = ? AND model_id IS NOT NULL",
136
+            Long.class, deviceSn
137
+        );
138
+        if (modelIds.isEmpty()) return null;
139
+
140
+        Long modelId = modelIds.get(0);
141
+        List<OtaFirmware> list = jdbcTemplate.query(
142
+            "SELECT * FROM iot_ota_firmware WHERE model_id = ? AND status = 'published' ORDER BY created_at DESC LIMIT 1",
143
+            new BeanPropertyRowMapper<>(OtaFirmware.class),
144
+            modelId
145
+        );
146
+        return list.isEmpty() ? null : list.get(0);
147
+    }
148
+
149
+    // ========== 升级任务管理 ==========
150
+
151
+    /**
152
+     * 创建OTA升级任务(按批次)
153
+     * @param firmwareId   固件ID
154
+     * @param deviceIds    目标设备ID列表
155
+     * @param batchSize    每批大小
156
+     * @param createdBy    创建人
157
+     */
158
+    @Transactional
159
+    public OtaTask createUpgradeTask(Long firmwareId, List<Long> deviceIds, int batchSize, String createdBy) {
160
+        OtaFirmware firmware = getFirmware(firmwareId);
161
+        if (firmware == null) throw new RuntimeException("固件不存在: " + firmwareId);
162
+        if (!"published".equals(firmware.getStatus())) throw new RuntimeException("固件未发布: " + firmwareId);
163
+
164
+        // 序列化设备ID列表
165
+        String deviceIdsJson;
166
+        try {
167
+            deviceIdsJson = objectMapper.writeValueAsString(deviceIds);
168
+        } catch (JsonProcessingException e) {
169
+            throw new RuntimeException("设备ID序列化失败", e);
170
+        }
171
+
172
+        OtaTask task = new OtaTask();
173
+        task.setFirmwareId(firmwareId);
174
+        task.setFirmwareVersion(firmware.getFirmwareVersion());
175
+        task.setTargetDeviceIds(deviceIdsJson);
176
+        task.setTaskStatus("pending");
177
+        task.setBatchSize(batchSize > 0 ? batchSize : 10);
178
+        task.setTotalDevices(deviceIds.size());
179
+        task.setSuccessCount(0);
180
+        task.setFailedCount(0);
181
+        task.setExecutingCount(0);
182
+        task.setCreatedBy(createdBy);
183
+        task.setCreatedAt(LocalDateTime.now());
184
+        task.setUpdatedAt(LocalDateTime.now());
185
+
186
+        KeyHolder keyHolder = new GeneratedKeyHolder();
187
+        jdbcTemplate.update(connection -> {
188
+            var ps = connection.prepareStatement(
189
+                "INSERT INTO iot_ota_task (firmware_id, firmware_version, target_device_ids, task_status, " +
190
+                "batch_size, total_devices, success_count, failed_count, executing_count, created_by, created_at, updated_at) " +
191
+                "VALUES (?, ?, ?::jsonb, ?, ?, ?, 0, 0, 0, ?, NOW(), NOW())",
192
+                Statement.RETURN_GENERATED_KEYS
193
+            );
194
+            ps.setLong(1, firmwareId);
195
+            ps.setString(2, firmware.getFirmwareVersion());
196
+            ps.setString(3, deviceIdsJson);
197
+            ps.setString(4, "pending");
198
+            ps.setInt(5, task.getBatchSize());
199
+            ps.setInt(6, deviceIds.size());
200
+            ps.setString(7, createdBy);
201
+            return ps;
202
+        }, keyHolder);
203
+
204
+        task.setId(keyHolder.getKey().longValue());
205
+
206
+        // 创建升级记录(每台设备一条)
207
+        createUpgradeRecords(task.getId(), deviceIds, firmware.getFirmwareVersion());
208
+
209
+        log.info("Upgrade task created: id={}, firmware={}, devices={}, batchSize={}",
210
+            task.getId(), firmware.getFirmwareVersion(), deviceIds.size(), task.getBatchSize());
211
+
212
+        return task;
213
+    }
214
+
215
+    /**
216
+     * 按设备类型/区域创建升级任务
217
+     */
218
+    @Transactional
219
+    public OtaTask createUpgradeTaskByFilter(Long firmwareId, String deviceType, String area,
220
+                                              int batchSize, String createdBy) {
221
+        // 查询符合条件的在线设备
222
+        String sql = "SELECT id FROM iot_device WHERE model_id = (SELECT model_id FROM iot_ota_firmware WHERE id = ?) " +
223
+                      "AND status = 'online'";
224
+        List<Object> params = new ArrayList<>();
225
+        params.add(firmwareId);
226
+
227
+        if (deviceType != null && !deviceType.isEmpty()) {
228
+            sql += " AND device_type = ?";
229
+            params.add(deviceType);
230
+        }
231
+        if (area != null && !area.isEmpty()) {
232
+            sql += " AND area = ?";
233
+            params.add(area);
234
+        }
235
+
236
+        List<Long> deviceIds = jdbcTemplate.queryForList(sql, Long.class, params.toArray());
237
+        if (deviceIds.isEmpty()) throw new RuntimeException("没有符合条件的在线设备");
238
+
239
+        OtaTask task = createUpgradeTask(firmwareId, deviceIds, batchSize, createdBy);
240
+        task.setTargetType(deviceType);
241
+        task.setTargetArea(area);
242
+
243
+        jdbcTemplate.update(
244
+            "UPDATE iot_ota_task SET target_type = ?, target_area = ? WHERE id = ?",
245
+            deviceType, area, task.getId()
246
+        );
247
+
248
+        return task;
249
+    }
250
+
251
+    /**
252
+     * 启动升级任务(开始执行)
253
+     */
254
+    @Transactional
255
+    public void startTask(Long taskId) {
256
+        OtaTask task = getTask(taskId);
257
+        if (task == null) throw new RuntimeException("任务不存在: " + taskId);
258
+        if (!"pending".equals(task.getTaskStatus())) throw new RuntimeException("任务状态不允许启动: " + task.getTaskStatus());
259
+
260
+        jdbcTemplate.update(
261
+            "UPDATE iot_ota_task SET task_status = 'executing', executed_at = NOW(), updated_at = NOW() WHERE id = ?",
262
+            taskId
263
+        );
264
+
265
+        // 将第一批次的记录设为executing
266
+        int batchSize = task.getBatchSize();
20 267
         jdbcTemplate.update(
21
-            "INSERT INTO iot_device_event (device_id, device_sn, event_type, event_data) " +
22
-            "SELECT id, device_sn, 'ota', json_build_object('version',?, 'url',?, 'md5',?) " +
23
-            "FROM iot_device WHERE model_id = ? AND status = 'online'",
24
-            firmwareVersion, firmwareUrl, checkMd5, modelId);
25
-        log.info("OTA task created for model {}: version={}", modelId, firmwareVersion);
268
+            "UPDATE iot_ota_upgrade_record SET status = 'executing', started_at = NOW() " +
269
+            "WHERE task_id = ? AND id IN (SELECT id FROM iot_ota_upgrade_record WHERE task_id = ? AND status = 'pending' LIMIT ?)",
270
+            taskId, taskId, batchSize
271
+        );
272
+
273
+        log.info("Task {} started, first batch size: {}", taskId, batchSize);
26 274
     }
27 275
 
28
-    /** 设备查询是否有待升级固件 */
29
-    public Map<String, Object> checkUpgrade(String deviceSn, String currentVersion) {
30
-        return jdbcTemplate.queryForMap(
31
-            "SELECT * FROM iot_device_event WHERE device_sn = ? AND event_type = 'ota' ORDER BY created_at DESC LIMIT 1",
32
-            deviceSn);
276
+    /**
277
+     * 取消升级任务
278
+     */
279
+    @Transactional
280
+    public void cancelTask(Long taskId) {
281
+        jdbcTemplate.update(
282
+            "UPDATE iot_ota_task SET task_status = 'cancelled', updated_at = NOW() WHERE id = ? AND task_status IN ('pending', 'executing')",
283
+            taskId
284
+        );
285
+        // 取消未完成的记录
286
+        jdbcTemplate.update(
287
+            "UPDATE iot_ota_upgrade_record SET status = 'failed', fail_reason = '任务已取消' " +
288
+            "WHERE task_id = ? AND status IN ('pending', 'executing')",
289
+            taskId
290
+        );
291
+    }
292
+
293
+    /**
294
+     * 查询升级任务详情
295
+     */
296
+    public OtaTask getTask(Long taskId) {
297
+        List<OtaTask> list = jdbcTemplate.query(
298
+            "SELECT * FROM iot_ota_task WHERE id = ?",
299
+            new BeanPropertyRowMapper<>(OtaTask.class),
300
+            taskId
301
+        );
302
+        return list.isEmpty() ? null : list.get(0);
303
+    }
304
+
305
+    /**
306
+     * 查询升级任务列表
307
+     */
308
+    public List<OtaTask> listTasks(int page, int size) {
309
+        int offset = (page - 1) * size;
310
+        return jdbcTemplate.query(
311
+            "SELECT * FROM iot_ota_task ORDER BY created_at DESC LIMIT ? OFFSET ?",
312
+            new BeanPropertyRowMapper<>(OtaTask.class),
313
+            size, offset
314
+        );
315
+    }
316
+
317
+    // ========== 进度追踪 ==========
318
+
319
+    /**
320
+     * 更新单台设备升级进度
321
+     */
322
+    @Transactional
323
+    public void updateProgress(Long recordId, int progress) {
324
+        jdbcTemplate.update(
325
+            "UPDATE iot_ota_upgrade_record SET progress = ? WHERE id = ?",
326
+            progress, recordId
327
+        );
328
+    }
329
+
330
+    /**
331
+     * 标记设备升级成功
332
+     */
333
+    @Transactional
334
+    public void markSuccess(Long recordId, Long deviceId, String toVersion) {
335
+        jdbcTemplate.update(
336
+            "UPDATE iot_ota_upgrade_record SET status = 'success', progress = 100, completed_at = NOW() WHERE id = ?",
337
+            recordId
338
+        );
339
+        jdbcTemplate.update(
340
+            "UPDATE iot_ota_task SET success_count = success_count + 1, executing_count = executing_count - 1, updated_at = NOW() WHERE id = (SELECT task_id FROM iot_ota_upgrade_record WHERE id = ?)",
341
+            recordId
342
+        );
343
+        // 更新设备固件版本
344
+        jdbcTemplate.update(
345
+            "UPDATE iot_device SET firmware_version = ? WHERE id = ?",
346
+            toVersion, deviceId
347
+        );
348
+
349
+        // 检查是否触发下一批次
350
+        triggerNextBatch(recordId);
351
+    }
352
+
353
+    /**
354
+     * 标记设备升级失败
355
+     */
356
+    @Transactional
357
+    public void markFailed(Long recordId, String failReason) {
358
+        jdbcTemplate.update(
359
+            "UPDATE iot_ota_upgrade_record SET status = 'failed', fail_reason = ?, completed_at = NOW() WHERE id = ?",
360
+            failReason, recordId
361
+        );
362
+        jdbcTemplate.update(
363
+            "UPDATE iot_ota_task SET failed_count = failed_count + 1, executing_count = executing_count - 1, updated_at = NOW() WHERE id = (SELECT task_id FROM iot_ota_upgrade_record WHERE id = ?)",
364
+            recordId
365
+        );
366
+
367
+        triggerNextBatch(recordId);
368
+    }
369
+
370
+    /**
371
+     * 查询升级任务统计
372
+     */
373
+    public Map<String, Object> getTaskStatistics(Long taskId) {
374
+        OtaTask task = getTask(taskId);
375
+        if (task == null) return Collections.emptyMap();
376
+
377
+        Map<String, Object> stats = new LinkedHashMap<>();
378
+        stats.put("taskId", taskId);
379
+        stats.put("taskStatus", task.getTaskStatus());
380
+        stats.put("totalDevices", task.getTotalDevices());
381
+        stats.put("successCount", task.getSuccessCount());
382
+        stats.put("failedCount", task.getFailedCount());
383
+        stats.put("executingCount", task.getExecutingCount());
384
+        stats.put("pendingCount", task.getTotalDevices() - task.getSuccessCount() - task.getFailedCount() - task.getExecutingCount());
385
+
386
+        double progress = task.getTotalDevices() > 0 ?
387
+            ((task.getSuccessCount() + task.getFailedCount()) * 100.0 / task.getTotalDevices()) : 0;
388
+        stats.put("progressPercent", Math.round(progress * 100) / 100.0);
389
+
390
+        // 按状态分组统计
391
+        List<Map<String, Object>> byStatus = jdbcTemplate.queryForList(
392
+            "SELECT status, COUNT(*) as count FROM iot_ota_upgrade_record WHERE task_id = ? GROUP BY status",
393
+            taskId
394
+        );
395
+        stats.put("statusBreakdown", byStatus);
396
+
397
+        return stats;
398
+    }
399
+
400
+    /**
401
+     * 查询升级记录列表
402
+     */
403
+    public List<OtaUpgradeRecord> getTaskRecords(Long taskId) {
404
+        return jdbcTemplate.query(
405
+            "SELECT * FROM iot_ota_upgrade_record WHERE task_id = ? ORDER BY id",
406
+            new BeanPropertyRowMapper<>(OtaUpgradeRecord.class),
407
+            taskId
408
+        );
409
+    }
410
+
411
+    /**
412
+     * 查询单台设备的升级历史
413
+     */
414
+    public List<OtaUpgradeRecord> getDeviceUpgradeHistory(String deviceSn) {
415
+        return jdbcTemplate.query(
416
+            "SELECT r.* FROM iot_ota_upgrade_record r JOIN iot_device d ON r.device_id = d.id " +
417
+            "WHERE d.device_sn = ? ORDER BY r.created_at DESC",
418
+            new BeanPropertyRowMapper<>(OtaUpgradeRecord.class),
419
+            deviceSn
420
+        );
421
+    }
422
+
423
+    // ===== 私有辅助方法 =====
424
+
425
+    /**
426
+     * 创建升级记录
427
+     */
428
+    private void createUpgradeRecords(Long taskId, List<Long> deviceIds, String toVersion) {
429
+        String sql = "INSERT INTO iot_ota_upgrade_record (task_id, device_id, device_sn, from_version, to_version, status, progress, created_at) " +
430
+                     "VALUES (?, ?, ?, ?, ?, 'pending', 0, NOW())";
431
+
432
+        List<Object[]> batchArgs = new ArrayList<>();
433
+        for (Long deviceId : deviceIds) {
434
+            // 查询设备SN和当前版本
435
+            List<Map<String, Object>> deviceInfo = jdbcTemplate.queryForList(
436
+                "SELECT device_sn, firmware_version FROM iot_device WHERE id = ?",
437
+                deviceId
438
+            );
439
+            if (!deviceInfo.isEmpty()) {
440
+                Map<String, Object> info = deviceInfo.get(0);
441
+                String deviceSn = (String) info.get("device_sn");
442
+                String fromVersion = info.get("firmware_version") != null ?
443
+                    (String) info.get("firmware_version") : "unknown";
444
+                batchArgs.add(new Object[]{taskId, deviceId, deviceSn, fromVersion, toVersion});
445
+            }
446
+        }
447
+
448
+        if (!batchArgs.isEmpty()) {
449
+            jdbcTemplate.batchUpdate(sql, batchArgs);
450
+        }
451
+    }
452
+
453
+    /**
454
+     * 触发下一批次
455
+     * 当一个批次的设备完成(成功或失败)后,自动启动下一批
456
+     */
457
+    private void triggerNextBatch(Long recordId) {
458
+        Long taskId = jdbcTemplate.queryForObject(
459
+            "SELECT task_id FROM iot_ota_upgrade_record WHERE id = ?", Long.class, recordId
460
+        );
461
+
462
+        OtaTask task = getTask(taskId);
463
+        if (task == null || !"executing".equals(task.getTaskStatus())) return;
464
+
465
+        // 检查是否还有执行中的记录
466
+        int executing = jdbcTemplate.queryForObject(
467
+            "SELECT COUNT(*) FROM iot_ota_upgrade_record WHERE task_id = ? AND status = 'executing'",
468
+            Integer.class, taskId
469
+        );
470
+
471
+        if (executing == 0) {
472
+            // 启动下一批
473
+            int pending = jdbcTemplate.queryForObject(
474
+                "SELECT COUNT(*) FROM iot_ota_upgrade_record WHERE task_id = ? AND status = 'pending'",
475
+                Integer.class, taskId
476
+            );
477
+
478
+            if (pending > 0) {
479
+                int nextBatch = Math.min(pending, task.getBatchSize());
480
+                jdbcTemplate.update(
481
+                    "UPDATE iot_ota_upgrade_record SET status = 'executing', started_at = NOW() " +
482
+                    "WHERE task_id = ? AND id IN (SELECT id FROM iot_ota_upgrade_record WHERE task_id = ? AND status = 'pending' LIMIT ?)",
483
+                    taskId, taskId, nextBatch
484
+                );
485
+                jdbcTemplate.update(
486
+                    "UPDATE iot_ota_task SET executing_count = executing_count + ? WHERE id = ?",
487
+                    nextBatch, taskId
488
+                );
489
+                log.info("Task {} next batch started: {} devices", taskId, nextBatch);
490
+            } else {
491
+                // 所有设备处理完毕,标记任务完成
492
+                int failedCount = jdbcTemplate.queryForObject(
493
+                    "SELECT COUNT(*) FROM iot_ota_upgrade_record WHERE task_id = ? AND status = 'failed'",
494
+                    Integer.class, taskId
495
+                );
496
+                String finalStatus = failedCount > 0 ? "completed" : "completed";
497
+                jdbcTemplate.update(
498
+                    "UPDATE iot_ota_task SET task_status = ?, completed_at = NOW(), updated_at = NOW() WHERE id = ?",
499
+                    finalStatus, taskId
500
+                );
501
+                log.info("Task {} completed: total={}, success={}, failed={}",
502
+                    taskId, task.getTotalDevices(), task.getSuccessCount(), failedCount);
503
+            }
504
+        }
33 505
     }
34 506
 }

+ 84
- 0
wm-iot/src/main/resources/db/V_shadow_ota.sql ファイルの表示

@@ -0,0 +1,84 @@
1
+-- =============================================
2
+-- 智慧水务管理系统 - 设备影子 + OTA固件升级 DDL
3
+-- 版本: V_shadow_ota
4
+-- =============================================
5
+
6
+-- OTA固件版本表
7
+CREATE TABLE IF NOT EXISTS iot_ota_firmware (
8
+    id              BIGSERIAL PRIMARY KEY,
9
+    model_id        BIGINT NOT NULL REFERENCES iot_device_model(id),
10
+    firmware_version VARCHAR(20) NOT NULL,
11
+    file_url        VARCHAR(500) NOT NULL,
12
+    description     TEXT,
13
+    status          VARCHAR(20) NOT NULL DEFAULT 'draft',  -- draft/published/deprecated
14
+    md5             VARCHAR(64),
15
+    file_size       BIGINT DEFAULT 0,
16
+    published_by    VARCHAR(50),
17
+    published_at    TIMESTAMP,
18
+    created_at      TIMESTAMP DEFAULT NOW(),
19
+    updated_at      TIMESTAMP DEFAULT NOW(),
20
+    UNIQUE(model_id, firmware_version)
21
+);
22
+
23
+COMMENT ON TABLE iot_ota_firmware IS 'OTA固件版本表';
24
+COMMENT ON COLUMN iot_ota_firmware.model_id IS '关联设备模型ID';
25
+COMMENT ON COLUMN iot_ota_firmware.firmware_version IS '固件版本号';
26
+COMMENT ON COLUMN iot_ota_firmware.file_url IS '固件文件下载地址';
27
+COMMENT ON COLUMN iot_ota_firmware.status IS '状态: draft-草稿/published-已发布/deprecated-已废弃';
28
+COMMENT ON COLUMN iot_ota_firmware.md5 IS 'MD5校验值';
29
+
30
+CREATE INDEX IF NOT EXISTS idx_ota_firmware_model ON iot_ota_firmware(model_id);
31
+CREATE INDEX IF NOT EXISTS idx_ota_firmware_status ON iot_ota_firmware(status);
32
+
33
+-- OTA升级任务表
34
+CREATE TABLE IF NOT EXISTS iot_ota_task (
35
+    id                  BIGSERIAL PRIMARY KEY,
36
+    firmware_id         BIGINT NOT NULL REFERENCES iot_ota_firmware(id),
37
+    firmware_version    VARCHAR(20) NOT NULL,
38
+    target_type         VARCHAR(30),               -- 目标设备类型
39
+    target_area         VARCHAR(50),               -- 目标区域
40
+    target_device_ids   JSONB,                     -- 目标设备ID列表
41
+    task_status         VARCHAR(20) NOT NULL DEFAULT 'pending',  -- pending/executing/completed/failed/cancelled
42
+    batch_size          INT DEFAULT 10,            -- 每批升级数量
43
+    total_devices       INT DEFAULT 0,             -- 总设备数
44
+    success_count       INT DEFAULT 0,
45
+    failed_count        INT DEFAULT 0,
46
+    executing_count     INT DEFAULT 0,
47
+    created_by          VARCHAR(50),
48
+    executed_at         TIMESTAMP,
49
+    completed_at        TIMESTAMP,
50
+    created_at          TIMESTAMP DEFAULT NOW(),
51
+    updated_at          TIMESTAMP DEFAULT NOW()
52
+);
53
+
54
+COMMENT ON TABLE iot_ota_task IS 'OTA升级任务表';
55
+COMMENT ON COLUMN iot_ota_task.task_status IS '任务状态: pending-待执行/executing-执行中/completed-已完成/failed-失败/cancelled-已取消';
56
+COMMENT ON COLUMN iot_ota_task.batch_size IS '每批升级设备数量';
57
+
58
+CREATE INDEX IF NOT EXISTS idx_ota_task_firmware ON iot_ota_task(firmware_id);
59
+CREATE INDEX IF NOT EXISTS idx_ota_task_status ON iot_ota_task(task_status);
60
+CREATE INDEX IF NOT EXISTS idx_ota_task_created ON iot_ota_task(created_at DESC);
61
+
62
+-- OTA升级记录表(每台设备一条记录)
63
+CREATE TABLE IF NOT EXISTS iot_ota_upgrade_record (
64
+    id              BIGSERIAL PRIMARY KEY,
65
+    task_id         BIGINT NOT NULL REFERENCES iot_ota_task(id),
66
+    device_id       BIGINT NOT NULL REFERENCES iot_device(id),
67
+    device_sn       VARCHAR(100) NOT NULL,
68
+    from_version    VARCHAR(20),                   -- 升级前版本
69
+    to_version      VARCHAR(20),                   -- 目标版本
70
+    status          VARCHAR(20) NOT NULL DEFAULT 'pending',  -- pending/executing/success/failed/timeout
71
+    fail_reason     VARCHAR(500),
72
+    progress        INT DEFAULT 0,                 -- 进度百分比 0-100
73
+    started_at      TIMESTAMP,
74
+    completed_at    TIMESTAMP,
75
+    created_at      TIMESTAMP DEFAULT NOW()
76
+);
77
+
78
+COMMENT ON TABLE iot_ota_upgrade_record IS 'OTA升级记录表(单台设备)';
79
+COMMENT ON COLUMN iot_ota_upgrade_record.status IS '升级状态: pending-待升级/executing-升级中/success-成功/failed-失败/timeout-超时';
80
+
81
+CREATE INDEX IF NOT EXISTS idx_ota_record_task ON iot_ota_upgrade_record(task_id);
82
+CREATE INDEX IF NOT EXISTS idx_ota_record_device ON iot_ota_upgrade_record(device_id);
83
+CREATE INDEX IF NOT EXISTS idx_ota_record_sn ON iot_ota_upgrade_record(device_sn);
84
+CREATE INDEX IF NOT EXISTS idx_ota_record_status ON iot_ota_upgrade_record(status);

+ 195
- 0
wm-iot/src/test/java/com/water/iot/service/DeviceShadowServiceTest.java ファイルの表示

@@ -0,0 +1,195 @@
1
+package com.water.iot.service;
2
+
3
+import com.fasterxml.jackson.databind.ObjectMapper;
4
+import com.water.iot.entity.DeviceShadow;
5
+import org.junit.jupiter.api.BeforeEach;
6
+import org.junit.jupiter.api.Test;
7
+import org.junit.jupiter.api.extension.ExtendWith;
8
+import org.mockito.Mock;
9
+import org.mockito.junit.jupiter.MockitoExtension;
10
+import org.springframework.data.redis.core.HashOperations;
11
+import org.springframework.data.redis.core.StringRedisTemplate;
12
+import org.springframework.jdbc.core.JdbcTemplate;
13
+
14
+import java.util.*;
15
+
16
+import static org.junit.jupiter.api.Assertions.*;
17
+import static org.mockito.ArgumentMatchers.*;
18
+import static org.mockito.Mockito.*;
19
+
20
+/**
21
+ * DeviceShadowService 单元测试
22
+ * Mock Redis + Mock JdbcTemplate
23
+ */
24
+@ExtendWith(MockitoExtension.class)
25
+class DeviceShadowServiceTest {
26
+
27
+    @Mock
28
+    private StringRedisTemplate redisTemplate;
29
+
30
+    @Mock
31
+    private JdbcTemplate jdbcTemplate;
32
+
33
+    @Mock
34
+    private HashOperations<String, Object, Object> hashOperations;
35
+
36
+    private DeviceShadowService shadowService;
37
+
38
+    @BeforeEach
39
+    void setUp() {
40
+        when(redisTemplate.opsForHash()).thenReturn(hashOperations);
41
+        shadowService = new DeviceShadowService(redisTemplate, jdbcTemplate, new ObjectMapper());
42
+    }
43
+
44
+    @Test
45
+    void updateReported_shouldUpdateRedisAndDb() {
46
+        // Given
47
+        String deviceSn = "FM-001";
48
+        Map<String, Object> state = Map.of("flow", 12.5, "pressure", 0.35);
49
+
50
+        when(hashOperations.get(eq("shadow:" + deviceSn), eq("version"))).thenReturn("1");
51
+        when(hashOperations.get(eq("shadow:" + deviceSn), eq("desired"))).thenReturn(null);
52
+        when(jdbcTemplate.update(anyString(), any())).thenReturn(1);
53
+        when(jdbcTemplate.queryForList(anyString(), eq(Long.class), anyString()))
54
+            .thenReturn(List.of(1L));
55
+
56
+        // When
57
+        shadowService.updateReported(deviceSn, state);
58
+
59
+        // Then
60
+        verify(hashOperations).putAll(eq("shadow:" + deviceSn), anyMap());
61
+        verify(redisTemplate).expire(eq("shadow:" + deviceSn), eq(24L), any());
62
+        verify(jdbcTemplate).update(contains("UPDATE iot_device"), eq(deviceSn));
63
+    }
64
+
65
+    @Test
66
+    void updateDesired_shouldComputeDelta() {
67
+        // Given
68
+        String deviceSn = "FM-001";
69
+        Map<String, Object> desired = Map.of("valveOpen", 80, "alarm", true);
70
+
71
+        when(hashOperations.get(eq("shadow:" + deviceSn), eq("version"))).thenReturn("2");
72
+        when(hashOperations.get(eq("shadow:" + deviceSn), eq("reported")))
73
+            .thenReturn("{\"valveOpen\":50,\"alarm\":true}");
74
+        when(jdbcTemplate.update(anyString(), any())).thenReturn(1);
75
+        when(jdbcTemplate.queryForList(anyString(), eq(Long.class), anyString()))
76
+            .thenReturn(List.of(1L));
77
+
78
+        // When
79
+        shadowService.updateDesired(deviceSn, desired);
80
+
81
+        // Then
82
+        verify(hashOperations).putAll(eq("shadow:" + deviceSn), argThat(map -> {
83
+            String delta = (String) map.get("delta");
84
+            // delta should contain valveOpen:80 (different from reported 50)
85
+            // but NOT alarm (same as reported)
86
+            return delta != null && delta.contains("valveOpen") && delta.contains("80");
87
+        }));
88
+    }
89
+
90
+    @Test
91
+    void getShadow_fromRedis_shouldReturnShadow() {
92
+        // Given
93
+        String deviceSn = "FM-001";
94
+        Map<Object, Object> entries = new HashMap<>();
95
+        entries.put("reported", "{\"flow\":12.5}");
96
+        entries.put("desired", "{\"flow\":15.0}");
97
+        entries.put("delta", "{\"flow\":15.0}");
98
+        entries.put("version", "3");
99
+        entries.put("online", "true");
100
+        entries.put("lastReport", "2026-06-14T10:00:00");
101
+
102
+        when(hashOperations.entries("shadow:" + deviceSn)).thenReturn(entries);
103
+
104
+        // When
105
+        DeviceShadow shadow = shadowService.getShadow(deviceSn);
106
+
107
+        // Then
108
+        assertNotNull(shadow);
109
+        assertEquals(deviceSn, shadow.getDeviceSn());
110
+        assertEquals("{\"flow\":12.5}", shadow.getReportedState());
111
+        assertEquals("{\"flow\":15.0}", shadow.getDesiredState());
112
+        assertEquals(3L, shadow.getVersion());
113
+        assertEquals("online", shadow.getOnlineStatus());
114
+    }
115
+
116
+    @Test
117
+    void getDesired_emptyRedis_shouldReturnEmpty() {
118
+        when(hashOperations.get(anyString(), eq("desired"))).thenReturn(null);
119
+        Map<String, Object> result = shadowService.getDesired("FM-001");
120
+        assertTrue(result.isEmpty());
121
+    }
122
+
123
+    @Test
124
+    void isOnline_true() {
125
+        when(hashOperations.get("shadow:FM-001", "online")).thenReturn("true");
126
+        assertTrue(shadowService.isOnline("FM-001"));
127
+    }
128
+
129
+    @Test
130
+    void isOnline_false() {
131
+        when(hashOperations.get("shadow:FM-001", "online")).thenReturn("false");
132
+        assertFalse(shadowService.isOnline("FM-001"));
133
+    }
134
+
135
+    @Test
136
+    void batchGetShadows_shouldReturnMultiple() {
137
+        // Given
138
+        List<String> sns = List.of("FM-001", "FM-002");
139
+        Map<Object, Object> entries1 = new HashMap<>();
140
+        entries1.put("reported", "{\"flow\":12}");
141
+        entries1.put("version", "1");
142
+        entries1.put("online", "true");
143
+
144
+        Map<Object, Object> entries2 = new HashMap<>();
145
+        entries2.put("reported", "{\"pressure\":0.4}");
146
+        entries2.put("version", "2");
147
+        entries2.put("online", "false");
148
+
149
+        when(hashOperations.entries("shadow:FM-001")).thenReturn(entries1);
150
+        when(hashOperations.entries("shadow:FM-002")).thenReturn(entries2);
151
+
152
+        // When
153
+        List<DeviceShadow> shadows = shadowService.batchGetShadows(sns);
154
+
155
+        // Then
156
+        assertEquals(2, shadows.size());
157
+    }
158
+
159
+    @Test
160
+    void detectOfflineDevices_shouldMarkOffline() {
161
+        // Given
162
+        Set<String> keys = Set.of("shadow:FM-001", "shadow:FM-002");
163
+        when(redisTemplate.keys("shadow:*")).thenReturn(keys);
164
+
165
+        // FM-001: last report 60 minutes ago (offline)
166
+        when(hashOperations.get(eq("shadow:FM-001"), eq("lastReport")))
167
+            .thenReturn(java.time.LocalDateTime.now().minusMinutes(60).toString());
168
+        // FM-002: last report 5 minutes ago (online)
169
+        when(hashOperations.get(eq("shadow:FM-002"), eq("lastReport")))
170
+            .thenReturn(java.time.LocalDateTime.now().minusMinutes(5).toString());
171
+
172
+        when(jdbcTemplate.update(anyString(), anyString())).thenReturn(1);
173
+
174
+        // When
175
+        List<String> offline = shadowService.detectOfflineDevices(30);
176
+
177
+        // Then
178
+        assertEquals(1, offline.size());
179
+        assertEquals("FM-001", offline.get(0));
180
+        verify(hashOperations).put("shadow:FM-001", "online", "false");
181
+    }
182
+
183
+    @Test
184
+    void deleteShadow_shouldRemoveFromRedisAndDb() {
185
+        when(redisTemplate.delete("shadow:FM-001")).thenReturn(true);
186
+        when(jdbcTemplate.queryForList(anyString(), eq(Long.class), anyString()))
187
+            .thenReturn(List.of(1L));
188
+        when(jdbcTemplate.update(anyString(), anyLong())).thenReturn(1);
189
+
190
+        shadowService.deleteShadow("FM-001");
191
+
192
+        verify(redisTemplate).delete("shadow:FM-001");
193
+        verify(jdbcTemplate).update(contains("DELETE FROM iot_device_shadow"), eq(1L));
194
+    }
195
+}

+ 310
- 0
wm-iot/src/test/java/com/water/iot/service/OtaServiceTest.java ファイルの表示

@@ -0,0 +1,310 @@
1
+package com.water.iot.service;
2
+
3
+import com.fasterxml.jackson.databind.ObjectMapper;
4
+import com.water.iot.entity.OtaFirmware;
5
+import com.water.iot.entity.OtaTask;
6
+import com.water.iot.entity.OtaUpgradeRecord;
7
+import org.junit.jupiter.api.BeforeEach;
8
+import org.junit.jupiter.api.Test;
9
+import org.junit.jupiter.api.extension.ExtendWith;
10
+import org.mockito.Mock;
11
+import org.mockito.junit.jupiter.MockitoExtension;
12
+import org.springframework.jdbc.core.BeanPropertyRowMapper;
13
+import org.springframework.jdbc.core.JdbcTemplate;
14
+import org.springframework.jdbc.support.GeneratedKeyHolder;
15
+
16
+import java.util.*;
17
+
18
+import static org.junit.jupiter.api.Assertions.*;
19
+import static org.mockito.ArgumentMatchers.*;
20
+import static org.mockito.Mockito.*;
21
+
22
+/**
23
+ * OtaService 单元测试
24
+ * Mock JdbcTemplate
25
+ */
26
+@ExtendWith(MockitoExtension.class)
27
+class OtaServiceTest {
28
+
29
+    @Mock
30
+    private JdbcTemplate jdbcTemplate;
31
+
32
+    @Mock
33
+    private DeviceShadowService shadowService;
34
+
35
+    private OtaService otaService;
36
+    private final ObjectMapper objectMapper = new ObjectMapper();
37
+
38
+    @BeforeEach
39
+    void setUp() {
40
+        otaService = new OtaService(jdbcTemplate, shadowService, objectMapper);
41
+    }
42
+
43
+    @Test
44
+    void createFirmware_shouldSetDraftStatusAndReturn() {
45
+        OtaFirmware firmware = new OtaFirmware();
46
+        firmware.setModelId(1L);
47
+        firmware.setFirmwareVersion("v2.0.0");
48
+        firmware.setFileUrl("http://oss.example.com/firmware/v2.0.0.bin");
49
+        firmware.setDescription("压力传感器固件升级");
50
+        firmware.setMd5("d41d8cd98f00b204e9800998ecf8427e");
51
+        firmware.setFileSize(1024000L);
52
+
53
+        when(jdbcTemplate.update(any(org.springframework.jdbc.core.PreparedStatementCreator.class),
54
+                any(GeneratedKeyHolder.class))).thenReturn(1);
55
+
56
+        OtaFirmware result = otaService.createFirmware(firmware);
57
+
58
+        assertNotNull(result);
59
+        assertEquals("draft", result.getStatus());
60
+        assertEquals("v2.0.0", result.getFirmwareVersion());
61
+        verify(jdbcTemplate).update(any(org.springframework.jdbc.core.PreparedStatementCreator.class),
62
+                any(GeneratedKeyHolder.class));
63
+    }
64
+
65
+    @Test
66
+    void publishFirmware_shouldUpdateStatus() {
67
+        when(jdbcTemplate.update(anyString(), anyString(), anyLong())).thenReturn(1);
68
+        otaService.publishFirmware(1L, "admin");
69
+        verify(jdbcTemplate).update(contains("UPDATE iot_ota_firmware"), eq("admin"), eq(1L));
70
+    }
71
+
72
+    @Test
73
+    void publishFirmware_notFound_shouldThrow() {
74
+        when(jdbcTemplate.update(anyString(), anyString(), anyLong())).thenReturn(0);
75
+        assertThrows(RuntimeException.class, () -> otaService.publishFirmware(999L, "admin"));
76
+    }
77
+
78
+    @Test
79
+    void getFirmware_shouldReturnFirmware() {
80
+        OtaFirmware fw = new OtaFirmware();
81
+        fw.setId(1L);
82
+        fw.setFirmwareVersion("v2.0.0");
83
+        when(jdbcTemplate.query(anyString(), any(BeanPropertyRowMapper.class), anyLong()))
84
+            .thenReturn(List.of(fw));
85
+
86
+        OtaFirmware result = otaService.getFirmware(1L);
87
+        assertNotNull(result);
88
+        assertEquals("v2.0.0", result.getFirmwareVersion());
89
+    }
90
+
91
+    @Test
92
+    void getFirmware_notFound_shouldReturnNull() {
93
+        when(jdbcTemplate.query(anyString(), any(BeanPropertyRowMapper.class), anyLong()))
94
+            .thenReturn(Collections.emptyList());
95
+        assertNull(otaService.getFirmware(999L));
96
+    }
97
+
98
+    @Test
99
+    void listAllFirmware_shouldReturnPaged() {
100
+        OtaFirmware fw1 = new OtaFirmware();
101
+        fw1.setId(1L);
102
+        OtaFirmware fw2 = new OtaFirmware();
103
+        fw2.setId(2L);
104
+        when(jdbcTemplate.query(anyString(), any(BeanPropertyRowMapper.class), anyInt(), anyInt()))
105
+            .thenReturn(List.of(fw1, fw2));
106
+
107
+        List<OtaFirmware> result = otaService.listAllFirmware(1, 10);
108
+        assertEquals(2, result.size());
109
+    }
110
+
111
+    @Test
112
+    void getAvailableFirmware_shouldReturnLatestPublished() {
113
+        // Device has model_id = 1
114
+        when(jdbcTemplate.queryForList(anyString(), eq(Long.class), anyString()))
115
+            .thenReturn(List.of(1L));
116
+
117
+        OtaFirmware fw = new OtaFirmware();
118
+        fw.setFirmwareVersion("v2.0.0");
119
+        fw.setStatus("published");
120
+        when(jdbcTemplate.query(contains("published"), any(BeanPropertyRowMapper.class), anyLong()))
121
+            .thenReturn(List.of(fw));
122
+
123
+        OtaFirmware result = otaService.getAvailableFirmware("FM-001");
124
+        assertNotNull(result);
125
+        assertEquals("v2.0.0", result.getFirmwareVersion());
126
+    }
127
+
128
+    @Test
129
+    void getAvailableFirmware_noModel_shouldReturnNull() {
130
+        when(jdbcTemplate.queryForList(anyString(), eq(Long.class), anyString()))
131
+            .thenReturn(Collections.emptyList());
132
+        assertNull(otaService.getAvailableFirmware("FM-001"));
133
+    }
134
+
135
+    @Test
136
+    void createUpgradeTask_shouldCreateTaskAndRecords() {
137
+        // Mock firmware exists and is published
138
+        OtaFirmware fw = new OtaFirmware();
139
+        fw.setId(1L);
140
+        fw.setFirmwareVersion("v2.0.0");
141
+        fw.setStatus("published");
142
+        when(jdbcTemplate.query(anyString(), any(BeanPropertyRowMapper.class), anyLong()))
143
+            .thenReturn(List.of(fw));
144
+
145
+        // Mock insert task
146
+        when(jdbcTemplate.update(any(org.springframework.jdbc.core.PreparedStatementCreator.class),
147
+                any(GeneratedKeyHolder.class))).thenReturn(1);
148
+
149
+        // Mock device info query
150
+        Map<String, Object> deviceInfo = new HashMap<>();
151
+        deviceInfo.put("device_sn", "FM-001");
152
+        deviceInfo.put("firmware_version", "v1.0.0");
153
+        when(jdbcTemplate.queryForList(contains("SELECT device_sn"), anyLong()))
154
+            .thenReturn(List.of(deviceInfo));
155
+
156
+        // Mock batch insert
157
+        when(jdbcTemplate.batchUpdate(anyString(), anyList())).thenReturn(new int[]{1});
158
+
159
+        OtaTask task = otaService.createUpgradeTask(1L, List.of(1L, 2L), 5, "admin");
160
+
161
+        assertNotNull(task);
162
+        assertEquals("v2.0.0", task.getFirmwareVersion());
163
+        assertEquals("pending", task.getTaskStatus());
164
+        assertEquals(2, task.getTotalDevices());
165
+        assertEquals(5, task.getBatchSize());
166
+    }
167
+
168
+    @Test
169
+    void createUpgradeTask_unpublishedFirmware_shouldThrow() {
170
+        OtaFirmware fw = new OtaFirmware();
171
+        fw.setId(1L);
172
+        fw.setStatus("draft");
173
+        when(jdbcTemplate.query(anyString(), any(BeanPropertyRowMapper.class), anyLong()))
174
+            .thenReturn(List.of(fw));
175
+
176
+        assertThrows(RuntimeException.class,
177
+            () -> otaService.createUpgradeTask(1L, List.of(1L), 5, "admin"));
178
+    }
179
+
180
+    @Test
181
+    void startTask_shouldUpdateStatusAndFirstBatch() {
182
+        // Mock task exists
183
+        OtaTask task = new OtaTask();
184
+        task.setId(1L);
185
+        task.setTaskStatus("pending");
186
+        task.setBatchSize(5);
187
+        when(jdbcTemplate.query(anyString(), any(BeanPropertyRowMapper.class), anyLong()))
188
+            .thenReturn(List.of(task));
189
+        when(jdbcTemplate.update(anyString(), any())).thenReturn(1);
190
+
191
+        otaService.startTask(1L);
192
+
193
+        verify(jdbcTemplate, atLeast(2)).update(anyString(), any());
194
+    }
195
+
196
+    @Test
197
+    void cancelTask_shouldUpdateStatusAndRecords() {
198
+        when(jdbcTemplate.update(anyString(), anyLong())).thenReturn(1);
199
+        otaService.cancelTask(1L);
200
+        verify(jdbcTemplate, times(2)).update(anyString(), anyLong());
201
+    }
202
+
203
+    @Test
204
+    void getTaskStatistics_shouldReturnStats() {
205
+        OtaTask task = new OtaTask();
206
+        task.setId(1L);
207
+        task.setTaskStatus("executing");
208
+        task.setTotalDevices(10);
209
+        task.setSuccessCount(5);
210
+        task.setFailedCount(1);
211
+        task.setExecutingCount(2);
212
+        when(jdbcTemplate.query(anyString(), any(BeanPropertyRowMapper.class), anyLong()))
213
+            .thenReturn(List.of(task));
214
+
215
+        List<Map<String, Object>> statusBreakdown = List.of(
216
+            Map.of("status", "success", "count", 5L),
217
+            Map.of("status", "failed", "count", 1L),
218
+            Map.of("status", "executing", "count", 2L)
219
+        );
220
+        when(jdbcTemplate.queryForList(anyString(), anyLong())).thenReturn(statusBreakdown);
221
+
222
+        Map<String, Object> stats = otaService.getTaskStatistics(1L);
223
+
224
+        assertNotNull(stats);
225
+        assertEquals(10, stats.get("totalDevices"));
226
+        assertEquals(5, stats.get("successCount"));
227
+        assertEquals(60.0, stats.get("progressPercent"));
228
+        assertNotNull(stats.get("statusBreakdown"));
229
+    }
230
+
231
+    @Test
232
+    void getTaskRecords_shouldReturnRecords() {
233
+        OtaUpgradeRecord record = new OtaUpgradeRecord();
234
+        record.setId(1L);
235
+        record.setTaskId(1L);
236
+        record.setStatus("success");
237
+        when(jdbcTemplate.query(anyString(), any(BeanPropertyRowMapper.class), anyLong()))
238
+            .thenReturn(List.of(record));
239
+
240
+        List<OtaUpgradeRecord> records = otaService.getTaskRecords(1L);
241
+        assertEquals(1, records.size());
242
+        assertEquals("success", records.get(0).getStatus());
243
+    }
244
+
245
+    @Test
246
+    void markSuccess_shouldUpdateRecordAndTask() {
247
+        when(jdbcTemplate.update(anyString(), any())).thenReturn(1);
248
+        when(jdbcTemplate.queryForObject(anyString(), eq(Long.class), anyLong())).thenReturn(1L);
249
+        
250
+        // Mock task for triggerNextBatch
251
+        OtaTask task = new OtaTask();
252
+        task.setId(1L);
253
+        task.setTaskStatus("executing");
254
+        task.setBatchSize(5);
255
+        when(jdbcTemplate.query(anyString(), any(BeanPropertyRowMapper.class), anyLong()))
256
+            .thenReturn(List.of(task));
257
+        when(jdbcTemplate.queryForObject(contains("COUNT"), eq(Integer.class), anyLong()))
258
+            .thenReturn(0, 3); // 0 executing, 3 pending
259
+
260
+        otaService.markSuccess(1L, 1L, "v2.0.0");
261
+
262
+        verify(jdbcTemplate, atLeast(3)).update(anyString(), any());
263
+    }
264
+
265
+    @Test
266
+    void markFailed_shouldUpdateWithReason() {
267
+        when(jdbcTemplate.update(anyString(), any())).thenReturn(1);
268
+        when(jdbcTemplate.queryForObject(anyString(), eq(Long.class), anyLong())).thenReturn(1L);
269
+
270
+        OtaTask task = new OtaTask();
271
+        task.setId(1L);
272
+        task.setTaskStatus("executing");
273
+        task.setBatchSize(5);
274
+        when(jdbcTemplate.query(anyString(), any(BeanPropertyRowMapper.class), anyLong()))
275
+            .thenReturn(List.of(task));
276
+        when(jdbcTemplate.queryForObject(contains("COUNT"), eq(Integer.class), anyLong()))
277
+            .thenReturn(0, 0); // 0 executing, 0 pending -> task completed
278
+
279
+        otaService.markFailed(1L, "设备连接超时");
280
+
281
+        verify(jdbcTemplate).update(contains("fail_reason"), eq("设备连接超时"), eq(1L));
282
+    }
283
+
284
+    @Test
285
+    void updateProgress_shouldUpdateRecord() {
286
+        when(jdbcTemplate.update(anyString(), anyInt(), anyLong())).thenReturn(1);
287
+        otaService.updateProgress(1L, 50);
288
+        verify(jdbcTemplate).update(contains("progress"), eq(50), eq(1L));
289
+    }
290
+
291
+    @Test
292
+    void deprecateFirmware_shouldUpdateStatus() {
293
+        when(jdbcTemplate.update(anyString(), anyLong())).thenReturn(1);
294
+        otaService.deprecateFirmware(1L);
295
+        verify(jdbcTemplate).update(contains("deprecated"), eq(1L));
296
+    }
297
+
298
+    @Test
299
+    void listTasks_shouldReturnPaged() {
300
+        OtaTask t1 = new OtaTask();
301
+        t1.setId(1L);
302
+        OtaTask t2 = new OtaTask();
303
+        t2.setId(2L);
304
+        when(jdbcTemplate.query(anyString(), any(BeanPropertyRowMapper.class), anyInt(), anyInt()))
305
+            .thenReturn(List.of(t1, t2));
306
+
307
+        List<OtaTask> result = otaService.listTasks(1, 10);
308
+        assertEquals(2, result.size());
309
+    }
310
+}