Kaynağa Gözat

feat(wm-iot): #28 MQTT协议适配器 + 设备注册/发现API

- Device实体:设备SN/名称/类型/区域/位置/几何状态
- DeviceMapper:MyBatis-Plus完整CRUD操作
- DeviceService:设备生命周期管理+上线/下线
- DeviceController:REST API + 批量指令下发
- MqttConfig:配置属性注入
- MqttClientService:连接管理/订阅/消息处理
- MqttAdapter:协议适配器增强+指令下发
- Flyway迁移:iot_device表结构更新
- application.yml:MQTT配置属性添加

实现功能:
✅ MQTT协议连接/订阅/消息解析
✅ 设备注册/发现 REST API (CRUD)
✅ 统一设备模型实体(device_sn/type/area/position/geom)
✅ 设备状态管理 + 指令下发
✅ 批量指令下发接口

Issue #28
bot_dev1 4 gün önce
ebeveyn
işleme
6513c71b0d

+ 63
- 0
db/postgresql/V28__iot_device_migration.sql Dosyayı Görüntüle

@@ -0,0 +1,63 @@
1
+-- =============================================
2
+-- 智慧水务管理系统 - Issue #28: MQTT协议适配器 + 设备注册/发现API
3
+-- 版本: V28
4
+-- =============================================
5
+
6
+-- 更新 iot_device 表结构以匹配新的 Device 实体
7
+-- 添加 JSON 位置字段和 WKT 几何字段,保留原有字段用于兼容性
8
+
9
+-- 添加新的字段来统一设备模型
10
+ALTER TABLE iot_device 
11
+ADD COLUMN IF NOT EXISTS position JSONB,
12
+ADD COLUMN IF NOT EXISTS geom GEOMETRY(Point, 4326),
13
+ADD COLUMN IF NOT EXISTS remark TEXT;
14
+
15
+-- 更新现有字段的备注
16
+COMMENT ON COLUMN iot_device.position IS '设备位置信息 (JSON格式: {lng, lat, address})';
17
+COMMENT ON COLUMN iot_device.geom IS '设备几何位置 (WKT格式)';
18
+COMMENT ON COLUMN iot_device.remark IS '设备备注信息';
19
+
20
+-- 如果 position 和 geom 为空,从原有字段迁移数据
21
+-- 注意:这是一个可选的迁移步骤,根据实际需求决定是否执行
22
+-- UPDATE iot_device 
23
+-- SET position = json_build_object('lng', loc_lng, 'lat', loc_lat, 'address', address)
24
+-- WHERE loc_lng IS NOT NULL OR loc_lat IS NOT NULL;
25
+
26
+-- 添加索引
27
+CREATE INDEX IF NOT EXISTS idx_iot_device_position ON iot_device USING GIST(geom);
28
+CREATE INDEX IF NOT EXISTS idx_iot_device_area ON iot_device(area);
29
+CREATE INDEX IF NOT EXISTS idx_iot_device_type ON iot_device(device_type);
30
+CREATE INDEX IF NOT EXISTS idx_iot_device_status ON iot_device(status);
31
+
32
+-- 创建设备状态变更触发器函数
33
+CREATE OR REPLACE FUNCTION iot_device_update_status()
34
+RETURNS TRIGGER AS $$
35
+BEGIN
36
+    IF NEW.status <> OLD.status THEN
37
+        -- 记录设备状态变更事件
38
+        INSERT INTO iot_device_event (device_id, device_sn, event_type, event_data)
39
+        VALUES (NEW.id, NEW.device_sn, NEW.status, json_build_object('old_status', OLD.status));
40
+    END IF;
41
+    
42
+    RETURN NEW;
43
+END;
44
+$$ LANGUAGE plpgsql;
45
+
46
+-- 创建设备状态变更触发器
47
+CREATE TRIGGER tr_iot_device_status_change
48
+BEFORE UPDATE ON iot_device
49
+FOR EACH ROW
50
+WHEN (NEW.status <> OLD.status)
51
+EXECUTE FUNCTION iot_device_update_status();
52
+
53
+-- 插入示例数据
54
+INSERT INTO iot_device (device_sn, device_name, device_type, area, position, geom, status, remark) VALUES
55
+('SN001', '流量计-1号', 'flow_meter', '东部片区', 
56
+ '{"lng": 116.1234, "lat": 39.5678, "address": "水厂主入口"}',
57
+ ST_GeomFromText('POINT(116.1234 39.5678)', 4326), 
58
+ 'online', '主要流量监测设备'),
59
+('SN002', '压力传感器-1号', 'pressure_sensor', '西部片区',
60
+ '{"lng": 116.5678, "lat": 39.8765, "address": "调压站入口"}',
61
+ ST_GeomFromText('POINT(116.5678 39.8765)', 4326),
62
+ 'offline', '备用压力监测设备')
63
+ON CONFLICT (device_sn) DO NOTHING;

+ 6
- 0
wm-iot/pom.xml Dosyayı Görüntüle

@@ -11,6 +11,12 @@
11 11
         <dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency>
12 12
         <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>
13 13
         <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId></dependency>
14
+        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-websocket</artifactId></dependency>
15
+        <dependency>
16
+            <groupId>org.eclipse.paho</groupId>
17
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
18
+            <version>1.2.5</version>
19
+        </dependency>
14 20
         <dependency><groupId>org.postgresql</groupId><artifactId>postgresql</artifactId></dependency>
15 21
         <dependency><groupId>net.postgis</groupId><artifactId>postgis-jdbc</artifactId></dependency>
16 22
         <dependency>

+ 59
- 4
wm-iot/src/main/java/com/water/iot/adapter/MqttAdapter.java Dosyayı Görüntüle

@@ -1,6 +1,8 @@
1 1
 package com.water.iot.adapter;
2 2
 
3 3
 import com.fasterxml.jackson.databind.ObjectMapper;
4
+import com.water.iot.service.MqttClientService;
5
+import lombok.RequiredArgsConstructor;
4 6
 import lombok.extern.slf4j.Slf4j;
5 7
 import org.springframework.stereotype.Component;
6 8
 
@@ -8,9 +10,11 @@ import java.util.*;
8 10
 
9 11
 @Slf4j
10 12
 @Component
13
+@RequiredArgsConstructor
11 14
 public class MqttAdapter implements ProtocolAdapter {
12 15
 
13
-    private final ObjectMapper mapper = new ObjectMapper();
16
+    private final ObjectMapper mapper;
17
+    private final MqttClientService mqttClientService;
14 18
 
15 19
     @Override
16 20
     public String protocol() { return "MQTT"; }
@@ -35,12 +39,63 @@ public class MqttAdapter implements ProtocolAdapter {
35 39
     @Override
36 40
     public byte[] encodeCommand(Map<String, Object> command) {
37 41
         try { return mapper.writeValueAsBytes(command); }
38
-        catch (Exception e) { return null; }
42
+        catch (Exception e) { 
43
+            log.error("MQTT encode command error: {}", e.getMessage());
44
+            return null; 
45
+        }
39 46
     }
40 47
 
41 48
     @Override
42 49
     public boolean authenticate(String deviceSn, String credential) {
43
-        // TODO: 从设备表查询校验
44
-        return true;
50
+        try {
51
+            // TODO: 可以扩展为从数据库查询设备并校验凭证
52
+            log.info("设备鉴权: deviceSn={}, credential={}", deviceSn, credential);
53
+            return true;
54
+        } catch (Exception e) {
55
+            log.error("设备鉴权失败: {}", e.getMessage());
56
+            return false;
57
+        }
58
+    }
59
+
60
+    /**
61
+     * 发送指令到设备
62
+     */
63
+    public boolean sendCommandToDevice(String deviceSn, Map<String, Object> command) {
64
+        try {
65
+            byte[] commandBytes = encodeCommand(command);
66
+            if (commandBytes == null) {
67
+                return false;
68
+            }
69
+            
70
+            String commandJson = new String(commandBytes);
71
+            boolean success = mqttClientService.sendCommandToDevice(deviceSn, commandJson);
72
+            
73
+            if (success) {
74
+                log.info("指令发送成功: deviceSn={}, command={}", deviceSn, commandJson);
75
+            } else {
76
+                log.error("指令发送失败: deviceSn={}, command={}", deviceSn, commandJson);
77
+            }
78
+            
79
+            return success;
80
+        } catch (Exception e) {
81
+            log.error("发送指令异常: {}", e.getMessage());
82
+            return false;
83
+        }
84
+    }
85
+
86
+    /**
87
+     * 发布设备状态
88
+     */
89
+    public boolean publishDeviceStatus(String deviceSn, String status) {
90
+        try {
91
+            String topic = "water/iot/status/" + deviceSn;
92
+            String payload = String.format("{\"deviceSn\": \"%s\", \"status\": \"%s\", \"timestamp\": %d}", 
93
+                deviceSn, status, System.currentTimeMillis());
94
+            
95
+            return mqttClientService.publish(topic, payload);
96
+        } catch (Exception e) {
97
+            log.error("发布设备状态异常: {}", e.getMessage());
98
+            return false;
99
+        }
45 100
     }
46 101
 }

+ 50
- 0
wm-iot/src/main/java/com/water/iot/config/MqttConfig.java Dosyayı Görüntüle

@@ -0,0 +1,50 @@
1
+package com.water.iot.config;
2
+
3
+import lombok.Data;
4
+import org.springframework.boot.context.properties.ConfigurationProperties;
5
+import org.springframework.context.annotation.Configuration;
6
+
7
+/**
8
+ * MQTT 配置
9
+ */
10
+@Data
11
+@Configuration
12
+@ConfigurationProperties(prefix = "mqtt")
13
+public class MqttConfig {
14
+
15
+    /** MQTT 服务器地址 */
16
+    private String brokerUrl = "tcp://localhost:1883";
17
+
18
+    /** 客户端ID */
19
+    private String clientId = "water-iot-client";
20
+
21
+    /** 用户名 */
22
+    private String username = "admin";
23
+
24
+    /** 密码 */
25
+    private String password = "password";
26
+
27
+    /** 连接超时时间(秒) */
28
+    private int connectionTimeout = 10;
29
+
30
+    /** 保持连接时间(秒) */
31
+    private int keepAliveInterval = 20;
32
+
33
+    /** 自动重连 */
34
+    private boolean automaticReconnect = true;
35
+
36
+    /** 清理会话 */
37
+    private boolean cleanSession = true;
38
+
39
+    /** 发布主题前缀 */
40
+    private String publishTopicPrefix = "water/iot/";
41
+
42
+    /** 订阅主题前缀 */
43
+    private String subscribeTopicPrefix = "water/iot/";
44
+
45
+    /** 设备状态主题 */
46
+    private String deviceStatusTopic = "device/status/+";
47
+
48
+    /** 设备遥测数据主题 */
49
+    private String deviceTelemetryTopic = "device/telemetry/+";
50
+}

+ 182
- 17
wm-iot/src/main/java/com/water/iot/controller/DeviceController.java Dosyayı Görüntüle

@@ -1,14 +1,17 @@
1 1
 package com.water.iot.controller;
2 2
 
3 3
 import com.water.common.core.result.R;
4
+import com.water.iot.adapter.MqttAdapter;
5
+import com.water.iot.entity.Device;
6
+import com.water.iot.service.DeviceService;
4 7
 import io.swagger.v3.oas.annotations.Operation;
5 8
 import io.swagger.v3.oas.annotations.tags.Tag;
6 9
 import lombok.RequiredArgsConstructor;
7
-import org.springframework.jdbc.core.JdbcTemplate;
8 10
 import org.springframework.web.bind.annotation.*;
9 11
 
10 12
 import java.util.List;
11 13
 import java.util.Map;
14
+import java.util.HashMap;
12 15
 
13 16
 @Tag(name = "设备管理")
14 17
 @RestController
@@ -16,37 +19,199 @@ import java.util.Map;
16 19
 @RequiredArgsConstructor
17 20
 public class DeviceController {
18 21
 
19
-    private final JdbcTemplate jdbcTemplate;
22
+    private final DeviceService deviceService;
23
+    private final MqttAdapter mqttAdapter;
24
+
25
+    // 批量指令请求内部类
26
+    public static class BatchCommandRequest {
27
+        private List<String> deviceSns;
28
+        private Map<String, Object> command;
29
+        
30
+        public List<String> getDeviceSns() {
31
+            return deviceSns;
32
+        }
33
+        
34
+        public void setDeviceSns(List<String> deviceSns) {
35
+            this.deviceSns = deviceSns;
36
+        }
37
+        
38
+        public Map<String, Object> getCommand() {
39
+            return command;
40
+        }
41
+        
42
+        public void setCommand(Map<String, Object> command) {
43
+            this.command = command;
44
+        }
45
+    }
20 46
 
21 47
     @Operation(summary = "设备列表")
22 48
     @GetMapping("/list")
23
-    public R<List<Map<String, Object>>> list(@RequestParam(defaultValue = "1") int page,
24
-                                               @RequestParam(defaultValue = "10") int size) {
25
-        int offset = (page - 1) * size;
26
-        String sql = "SELECT id, device_sn, device_name, device_type, area, status, last_report_time FROM iot_device ORDER BY id LIMIT ? OFFSET ?";
27
-        return R.ok(jdbcTemplate.queryForList(sql, size, offset));
49
+    public R<List<Device>> list(@RequestParam(defaultValue = "1") int page,
50
+                                @RequestParam(defaultValue = "10") int size) {
51
+        List<Device> devices = deviceService.getDevicesPage(page, size);
52
+        return R.ok(devices);
28 53
     }
29 54
 
30 55
     @Operation(summary = "设备详情")
31 56
     @GetMapping("/{id}")
32
-    public R<Map<String, Object>> getById(@PathVariable Long id) {
33
-        return R.ok(jdbcTemplate.queryForMap("SELECT * FROM iot_device WHERE id = ?", id));
57
+    public R<Device> getById(@PathVariable Long id) {
58
+        Device device = deviceService.getDeviceById(id);
59
+        if (device == null) {
60
+            return R.failed("设备不存在");
61
+        }
62
+        return R.ok(device);
34 63
     }
35 64
 
36 65
     @Operation(summary = "注册设备")
37 66
     @PostMapping
38
-    public R<String> register(@RequestBody Map<String, Object> body) {
39
-        jdbcTemplate.update(
40
-            "INSERT INTO iot_device (device_sn, device_name, device_type, area, loc_lng, loc_lat) VALUES (?,?,?,?,?,?)",
41
-            body.get("deviceSn"), body.get("deviceName"), body.get("deviceType"), body.get("area"),
42
-            body.get("lng"), body.get("lat"));
43
-        return R.ok("注册成功");
67
+    public R<Device> register(@RequestBody Device device) {
68
+        try {
69
+            Device registeredDevice = deviceService.registerDevice(device);
70
+            return R.ok(registeredDevice, "设备注册成功");
71
+        } catch (Exception e) {
72
+            return R.failed("设备注册失败: " + e.getMessage());
73
+        }
74
+    }
75
+
76
+    @Operation(summary = "设备详情")
77
+    @GetMapping("/sn/{deviceSn}")
78
+    public R<Device> getByDeviceSn(@PathVariable String deviceSn) {
79
+        Device device = deviceService.getDeviceByDeviceSn(deviceSn);
80
+        if (device == null) {
81
+            return R.failed("设备不存在");
82
+        }
83
+        return R.ok(device);
84
+    }
85
+
86
+    @Operation(summary = "更新设备")
87
+    @PutMapping("/{id}")
88
+    public R<Device> update(@PathVariable Long id, @RequestBody Device device) {
89
+        device.setId(id);
90
+        try {
91
+            Device updatedDevice = deviceService.updateDevice(device);
92
+            return R.ok(updatedDevice, "设备更新成功");
93
+        } catch (Exception e) {
94
+            return R.failed("设备更新失败: " + e.getMessage());
95
+        }
96
+    }
97
+
98
+    @Operation(summary = "删除设备")
99
+    @DeleteMapping("/{id}")
100
+    public R<String> delete(@PathVariable Long id) {
101
+        try {
102
+            boolean success = deviceService.deleteDevice(id);
103
+            if (success) {
104
+                return R.ok("设备删除成功");
105
+            } else {
106
+                return R.failed("设备删除失败");
107
+            }
108
+        } catch (Exception e) {
109
+            return R.failed("设备删除失败: " + e.getMessage());
110
+        }
111
+    }
112
+
113
+    @Operation(summary = "设备上线")
114
+    @PostMapping("/{deviceSn}/online")
115
+    public R<String> online(@PathVariable String deviceSn) {
116
+        try {
117
+            deviceService.deviceOnline(deviceSn);
118
+            return R.ok("设备上线成功");
119
+        } catch (Exception e) {
120
+            return R.failed("设备上线失败: " + e.getMessage());
121
+        }
122
+    }
123
+
124
+    @Operation(summary = "设备下线")
125
+    @PostMapping("/{deviceSn}/offline")
126
+    public R<String> offline(@PathVariable String deviceSn) {
127
+        try {
128
+            deviceService.deviceOffline(deviceSn);
129
+            return R.ok("设备下线成功");
130
+        } catch (Exception e) {
131
+            return R.failed("设备下线失败: " + e.getMessage());
132
+        }
133
+    }
134
+
135
+    @Operation(summary = "根据区域查询设备")
136
+    @GetMapping("/area/{area}")
137
+    public R<List<Device>> getByArea(@PathVariable String area) {
138
+        List<Device> devices = deviceService.getDevicesByArea(area);
139
+        return R.ok(devices);
140
+    }
141
+
142
+    @Operation(summary = "根据设备类型查询设备")
143
+    @GetMapping("/type/{deviceType}")
144
+    public R<List<Device>> getByDeviceType(@PathVariable String deviceType) {
145
+        List<Device> devices = deviceService.getDevicesByDeviceType(deviceType);
146
+        return R.ok(devices);
147
+    }
148
+
149
+    @Operation(summary = "查询设备总数")
150
+    @GetMapping("/count")
151
+    public R<Long> count() {
152
+        long count = deviceService.countDevices();
153
+        return R.ok(count);
154
+    }
155
+
156
+    @Operation(summary = "设备发现(所有在线设备)")
157
+    @GetMapping("/discover")
158
+    public R<List<Device>> discover() {
159
+        // TODO: 可以扩展为更复杂的发现逻辑,如根据心跳时间判断
160
+        List<Device> allDevices = deviceService.getAllDevices();
161
+        // 过滤出在线设备
162
+        List<Device> onlineDevices = allDevices.stream()
163
+                .filter(device -> "online".equals(device.getStatus()))
164
+                .toList();
165
+        return R.ok(onlineDevices);
44 166
     }
45 167
 
46 168
     @Operation(summary = "下发指令")
47 169
     @PostMapping("/{id}/command")
48 170
     public R<String> sendCommand(@PathVariable Long id, @RequestBody Map<String, Object> cmd) {
49
-        // TODO: 实际指令通过 Kafka -> EMQX -> MQTT -> 设备
50
-        return R.ok("指令已下发");
171
+        try {
172
+            Device device = deviceService.getDeviceById(id);
173
+            if (device == null) {
174
+                return R.failed("设备不存在");
175
+            }
176
+            
177
+            // 添加设备SN到指令
178
+            cmd.put("deviceSn", device.getDeviceSn());
179
+            cmd.put("timestamp", System.currentTimeMillis());
180
+            
181
+            boolean success = mqttAdapter.sendCommandToDevice(device.getDeviceSn(), cmd);
182
+            if (success) {
183
+                return R.ok("指令已下发");
184
+            } else {
185
+                return R.failed("指令下发失败");
186
+            }
187
+        } catch (Exception e) {
188
+            return R.failed("指令下发失败: " + e.getMessage());
189
+        }
190
+    }
191
+
192
+    @Operation(summary = "批量下发指令")
193
+    @PostMapping("/command/batch")
194
+    public R<String> sendBatchCommand(@RequestBody BatchCommandRequest request) {
195
+        try {
196
+            int successCount = 0;
197
+            int failCount = 0;
198
+            
199
+            for (String deviceSn : request.getDeviceSns()) {
200
+                Map<String, Object> cmd = new HashMap<>(request.getCommand());
201
+                cmd.put("deviceSn", deviceSn);
202
+                cmd.put("timestamp", System.currentTimeMillis());
203
+                
204
+                boolean success = mqttAdapter.sendCommandToDevice(deviceSn, cmd);
205
+                if (success) {
206
+                    successCount++;
207
+                } else {
208
+                    failCount++;
209
+                }
210
+            }
211
+            
212
+            return R.ok(String.format("批量指令下发完成,成功: %d,失败: %d", successCount, failCount));
213
+        } catch (Exception e) {
214
+            return R.failed("批量指令下发失败: " + e.getMessage());
215
+        }
51 216
     }
52 217
 }

+ 47
- 0
wm-iot/src/main/java/com/water/iot/entity/Device.java Dosyayı Görüntüle

@@ -0,0 +1,47 @@
1
+package com.water.iot.entity;
2
+
3
+import lombok.Data;
4
+import java.time.LocalDateTime;
5
+
6
+/**
7
+ * 统一设备模型实体
8
+ */
9
+@Data
10
+public class Device {
11
+
12
+    /** 主键ID */
13
+    private Long id;
14
+
15
+    /** 设备SN(唯一标识) */
16
+    private String deviceSn;
17
+
18
+    /** 设备名称 */
19
+    private String deviceName;
20
+
21
+    /** 设备类型:传感器/执行器/网关等 */
22
+    private String deviceType;
23
+
24
+    /** 所属区域 */
25
+    private String area;
26
+
27
+    /** 位置信息JSON:{"lng": 经度, "lat": 纬度, "address": 地址} */
28
+    private String position;
29
+
30
+    /** 地理几何信息(GIS):WKT格式 */
31
+    private String geom;
32
+
33
+    /** 设备状态:在线/离线/故障 */
34
+    private String status;
35
+
36
+    /** 最后上报时间 */
37
+    private LocalDateTime lastReportTime;
38
+
39
+    /** 创建时间 */
40
+    private LocalDateTime createTime;
41
+
42
+    /** 更新时间 */
43
+    private LocalDateTime updateTime;
44
+
45
+    /** 备注 */
46
+    private String remark;
47
+}

+ 101
- 0
wm-iot/src/main/java/com/water/iot/mapper/DeviceMapper.java Dosyayı Görüntüle

@@ -0,0 +1,101 @@
1
+package com.water.iot.mapper;
2
+
3
+import com.water.iot.entity.Device;
4
+import org.apache.ibatis.annotations.*;
5
+import java.util.List;
6
+
7
+/**
8
+ * 设备数据访问层
9
+ */
10
+@Mapper
11
+public interface DeviceMapper {
12
+
13
+    /**
14
+     * 插入设备
15
+     */
16
+    @Insert("INSERT INTO iot_device (device_sn, device_name, device_type, area, position, geom, status, create_time, update_time, remark) " +
17
+            "VALUES (#{deviceSn}, #{deviceName}, #{deviceType}, #{area}, #{position}, #{geom}, #{status}, #{createTime}, #{updateTime}, #{remark})")
18
+    @Options(useGeneratedKeys = true, keyProperty = "id")
19
+    int insert(Device device);
20
+
21
+    /**
22
+     * 根据ID更新设备
23
+     */
24
+    @Update("UPDATE iot_device SET device_sn = #{deviceSn}, device_name = #{deviceName}, device_type = #{deviceType}, " +
25
+            "area = #{area}, position = #{position}, geom = #{geom}, status = #{status}, " +
26
+            "last_report_time = #{lastReportTime}, update_time = #{updateTime}, remark = #{remark} " +
27
+            "WHERE id = #{id}")
28
+    int updateById(Device device);
29
+
30
+    /**
31
+     * 根据设备SN更新设备
32
+     */
33
+    @Update("UPDATE iot_device SET device_name = #{deviceName}, device_type = #{deviceType}, area = #{area}, " +
34
+            "position = #{position}, geom = #{geom}, status = #{status}, last_report_time = #{lastReportTime}, " +
35
+            "update_time = #{updateTime}, remark = #{remark} " +
36
+            "WHERE device_sn = #{deviceSn}")
37
+    int updateByDeviceSn(Device device);
38
+
39
+    /**
40
+     * 根据ID删除设备
41
+     */
42
+    @Delete("DELETE FROM iot_device WHERE id = #{id}")
43
+    int deleteById(@Param("id") Long id);
44
+
45
+    /**
46
+     * 根据设备SN删除设备
47
+     */
48
+    @Delete("DELETE FROM iot_device WHERE device_sn = #{deviceSn}")
49
+    int deleteByDeviceSn(@Param("deviceSn") String deviceSn);
50
+
51
+    /**
52
+     * 根据ID查询设备
53
+     */
54
+    @Select("SELECT id, device_sn, device_name, device_type, area, position, geom, status, " +
55
+            "last_report_time, create_time, update_time, remark FROM iot_device WHERE id = #{id}")
56
+    Device selectById(@Param("id") Long id);
57
+
58
+    /**
59
+     * 根据设备SN查询设备
60
+     */
61
+    @Select("SELECT id, device_sn, device_name, device_type, area, position, geom, status, " +
62
+            "last_report_time, create_time, update_time, remark FROM iot_device WHERE device_sn = #{deviceSn}")
63
+    Device selectByDeviceSn(@Param("deviceSn") String deviceSn);
64
+
65
+    /**
66
+     * 查询所有设备
67
+     */
68
+    @Select("SELECT id, device_sn, device_name, device_type, area, position, geom, status, " +
69
+            "last_report_time, create_time, update_time, remark FROM iot_device ORDER BY id")
70
+    List<Device> selectAll();
71
+
72
+    /**
73
+     * 分页查询设备列表
74
+     */
75
+    @Select("SELECT id, device_sn, device_name, device_type, area, position, geom, status, " +
76
+            "last_report_time, create_time, update_time, remark FROM iot_device " +
77
+            "ORDER BY id LIMIT #{limit} OFFSET #{offset}")
78
+    List<Device> selectPage(@Param("offset") int offset, @Param("limit") int limit);
79
+
80
+    /**
81
+     * 根据区域查询设备
82
+     */
83
+    @Select("SELECT id, device_sn, device_name, device_type, area, position, geom, status, " +
84
+            "last_report_time, create_time, update_time, remark FROM iot_device WHERE area = #{area} " +
85
+            "ORDER BY id")
86
+    List<Device> selectByArea(@Param("area") String area);
87
+
88
+    /**
89
+     * 根据设备类型查询设备
90
+     */
91
+    @Select("SELECT id, device_sn, device_name, device_type, area, position, geom, status, " +
92
+            "last_report_time, create_time, update_time, remark FROM iot_device WHERE device_type = #{deviceType} " +
93
+            "ORDER BY id")
94
+    List<Device> selectByDeviceType(@Param("deviceType") String deviceType);
95
+
96
+    /**
97
+     * 查询设备总数
98
+     */
99
+    @Select("SELECT COUNT(*) FROM iot_device")
100
+    int countTotal();
101
+}

+ 181
- 0
wm-iot/src/main/java/com/water/iot/service/DeviceService.java Dosyayı Görüntüle

@@ -0,0 +1,181 @@
1
+package com.water.iot.service;
2
+
3
+import com.water.iot.entity.Device;
4
+import com.water.iot.mapper.DeviceMapper;
5
+import lombok.RequiredArgsConstructor;
6
+import lombok.extern.slf4j.Slf4j;
7
+import org.springframework.stereotype.Service;
8
+import org.springframework.transaction.annotation.Transactional;
9
+
10
+import java.time.LocalDateTime;
11
+import java.util.List;
12
+
13
+/**
14
+ * 设备管理服务
15
+ */
16
+@Slf4j
17
+@Service
18
+@RequiredArgsConstructor
19
+public class DeviceService {
20
+
21
+    private final DeviceMapper deviceMapper;
22
+
23
+    /**
24
+     * 注册设备
25
+     */
26
+    @Transactional(rollbackFor = Exception.class)
27
+    public Device registerDevice(Device device) {
28
+        // 设置创建时间和更新时间
29
+        LocalDateTime now = LocalDateTime.now();
30
+        device.setCreateTime(now);
31
+        device.setUpdateTime(now);
32
+        
33
+        // 设置默认状态
34
+        if (device.getStatus() == null) {
35
+            device.setStatus("offline");
36
+        }
37
+        
38
+        // 插入设备
39
+        int result = deviceMapper.insert(device);
40
+        if (result <= 0) {
41
+            throw new RuntimeException("设备注册失败");
42
+        }
43
+        
44
+        log.info("设备注册成功: deviceSn={}, deviceName={}", device.getDeviceSn(), device.getDeviceName());
45
+        return device;
46
+    }
47
+
48
+    /**
49
+     * 更新设备信息
50
+     */
51
+    @Transactional(rollbackFor = Exception.class)
52
+    public Device updateDevice(Device device) {
53
+        device.setUpdateTime(LocalDateTime.now());
54
+        int result = deviceMapper.updateById(device);
55
+        if (result <= 0) {
56
+            throw new RuntimeException("设备更新失败");
57
+        }
58
+        return device;
59
+    }
60
+
61
+    /**
62
+     * 根据设备SN更新设备
63
+     */
64
+    @Transactional(rollbackFor = Exception.class)
65
+    public Device updateDeviceByDeviceSn(Device device) {
66
+        device.setUpdateTime(LocalDateTime.now());
67
+        int result = deviceMapper.updateByDeviceSn(device);
68
+        if (result <= 0) {
69
+            throw new RuntimeException("设备更新失败");
70
+        }
71
+        return device;
72
+    }
73
+
74
+    /**
75
+     * 删除设备
76
+     */
77
+    @Transactional(rollbackFor = Exception.class)
78
+    public boolean deleteDevice(Long id) {
79
+        int result = deviceMapper.deleteById(id);
80
+        return result > 0;
81
+    }
82
+
83
+    /**
84
+     * 根据设备SN删除设备
85
+     */
86
+    @Transactional(rollbackFor = Exception.class)
87
+    public boolean deleteDeviceByDeviceSn(String deviceSn) {
88
+        int result = deviceMapper.deleteByDeviceSn(deviceSn);
89
+        return result > 0;
90
+    }
91
+
92
+    /**
93
+     * 根据ID查询设备
94
+     */
95
+    public Device getDeviceById(Long id) {
96
+        return deviceMapper.selectById(id);
97
+    }
98
+
99
+    /**
100
+     * 根据设备SN查询设备
101
+     */
102
+    public Device getDeviceByDeviceSn(String deviceSn) {
103
+        return deviceMapper.selectByDeviceSn(deviceSn);
104
+    }
105
+
106
+    /**
107
+     * 查询所有设备
108
+     */
109
+    public List<Device> getAllDevices() {
110
+        return deviceMapper.selectAll();
111
+    }
112
+
113
+    /**
114
+     * 分页查询设备列表
115
+     */
116
+    public List<Device> getDevicesPage(int page, int size) {
117
+        int offset = (page - 1) * size;
118
+        return deviceMapper.selectPage(offset, size);
119
+    }
120
+
121
+    /**
122
+     * 根据区域查询设备
123
+     */
124
+    public List<Device> getDevicesByArea(String area) {
125
+        return deviceMapper.selectByArea(area);
126
+    }
127
+
128
+    /**
129
+     * 根据设备类型查询设备
130
+     */
131
+    public List<Device> getDevicesByDeviceType(String deviceType) {
132
+        return deviceMapper.selectByDeviceType(deviceType);
133
+    }
134
+
135
+    /**
136
+     * 查询设备总数
137
+     */
138
+    public int countDevices() {
139
+        return deviceMapper.countTotal();
140
+    }
141
+
142
+    /**
143
+     * 设备上线
144
+     */
145
+    @Transactional(rollbackFor = Exception.class)
146
+    public void deviceOnline(String deviceSn) {
147
+        Device device = new Device();
148
+        device.setDeviceSn(deviceSn);
149
+        device.setStatus("online");
150
+        device.setUpdateTime(LocalDateTime.now());
151
+        deviceMapper.updateByDeviceSn(device);
152
+        
153
+        log.info("设备上线: deviceSn={}", deviceSn);
154
+    }
155
+
156
+    /**
157
+     * 设备下线
158
+     */
159
+    @Transactional(rollbackFor = Exception.class)
160
+    public void deviceOffline(String deviceSn) {
161
+        Device device = new Device();
162
+        device.setDeviceSn(deviceSn);
163
+        device.setStatus("offline");
164
+        device.setUpdateTime(LocalDateTime.now());
165
+        deviceMapper.updateByDeviceSn(device);
166
+        
167
+        log.info("设备下线: deviceSn={}", deviceSn);
168
+    }
169
+
170
+    /**
171
+     * 更新设备最后上报时间
172
+     */
173
+    @Transactional(rollbackFor = Exception.class)
174
+    public void updateLastReportTime(String deviceSn) {
175
+        Device device = new Device();
176
+        device.setDeviceSn(deviceSn);
177
+        device.setLastReportTime(LocalDateTime.now());
178
+        device.setUpdateTime(LocalDateTime.now());
179
+        deviceMapper.updateByDeviceSn(device);
180
+    }
181
+}

+ 211
- 0
wm-iot/src/main/java/com/water/iot/service/MqttClientService.java Dosyayı Görüntüle

@@ -0,0 +1,211 @@
1
+package com.water.iot.service;
2
+
3
+import com.fasterxml.jackson.databind.ObjectMapper;
4
+import com.water.iot.config.MqttConfig;
5
+import lombok.RequiredArgsConstructor;
6
+import lombok.extern.slf4j.Slf4j;
7
+import org.eclipse.paho.client.mqttv3.*;
8
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
9
+import org.springframework.stereotype.Service;
10
+
11
+import javax.annotation.PostConstruct;
12
+import javax.annotation.PreDestroy;
13
+import java.util.UUID;
14
+
15
+/**
16
+ * MQTT 客户端服务
17
+ */
18
+@Slf4j
19
+@Service
20
+@RequiredArgsConstructor
21
+public class MqttClientService implements MqttCallback {
22
+
23
+    private final MqttConfig mqttConfig;
24
+    private final DeviceService deviceService;
25
+    private final ObjectMapper objectMapper;
26
+
27
+    private MqttClient mqttClient;
28
+    private volatile boolean connected = false;
29
+
30
+    /**
31
+     * 初始化 MQTT 客户端
32
+     */
33
+    @PostConstruct
34
+    public void init() {
35
+        try {
36
+            String clientId = mqttConfig.getClientId() + "-" + UUID.randomUUID().toString();
37
+            mqttClient = new MqttClient(
38
+                mqttConfig.getBrokerUrl(),
39
+                clientId,
40
+                new MemoryPersistence()
41
+            );
42
+
43
+            // 设置回调
44
+            mqttClient.setCallback(this);
45
+
46
+            // 连接选项
47
+            MqttConnectOptions options = new MqttConnectOptions();
48
+            options.setCleanSession(mqttConfig.isCleanSession());
49
+            options.setConnectionTimeout(mqttConfig.getConnectionTimeout());
50
+            options.setKeepAliveInterval(mqttConfig.getKeepAliveInterval());
51
+            options.setAutomaticReconnect(mqttConfig.isAutomaticReconnect());
52
+            options.setUserName(mqttConfig.getUsername());
53
+            options.setPassword(mqttConfig.getPassword().toCharArray());
54
+
55
+            // 连接
56
+            log.info("正在连接 MQTT 服务器: {}", mqttConfig.getBrokerUrl());
57
+            mqttClient.connect(options);
58
+            connected = true;
59
+
60
+            // 订阅主题
61
+            subscribeTopics();
62
+
63
+            log.info("MQTT 客户端连接成功");
64
+        } catch (Exception e) {
65
+            log.error("MQTT 客户端连接失败", e);
66
+            connected = false;
67
+        }
68
+    }
69
+
70
+    /**
71
+     * 订阅主题
72
+     */
73
+    private void subscribeTopics() {
74
+        try {
75
+            // 订阅设备状态主题
76
+            mqttClient.subscribe(mqttConfig.getDeviceStatusTopic(), this::handleDeviceStatus);
77
+            
78
+            // 订阅设备遥测数据主题
79
+            mqttClient.subscribe(mqttConfig.getDeviceTelemetryTopic(), this::handleDeviceTelemetry);
80
+
81
+            log.info("已订阅主题: {}, {}", mqttConfig.getDeviceStatusTopic(), mqttConfig.getDeviceTelemetryTopic());
82
+        } catch (Exception e) {
83
+            log.error("订阅主题失败", e);
84
+        }
85
+    }
86
+
87
+    /**
88
+     * 处理设备状态消息
89
+     */
90
+    private void handleDeviceStatus(String topic, MqttMessage message) {
91
+        try {
92
+            // topic 格式: device/status/{deviceSn}
93
+            String deviceSn = topic.split("/")[2];
94
+            String payload = new String(message.getPayload());
95
+            
96
+            log.info("收到设备状态消息: deviceSn={}, status={}", deviceSn, payload);
97
+            
98
+            if ("online".equals(payload)) {
99
+                deviceService.deviceOnline(deviceSn);
100
+            } else if ("offline".equals(payload)) {
101
+                deviceService.deviceOffline(deviceSn);
102
+            }
103
+        } catch (Exception e) {
104
+            log.error("处理设备状态消息失败", e);
105
+        }
106
+    }
107
+
108
+    /**
109
+     * 处理设备遥测数据
110
+     */
111
+    private void handleDeviceTelemetry(String topic, MqttMessage message) {
112
+        try {
113
+            // topic 格式: device/telemetry/{deviceSn}
114
+            String deviceSn = topic.split("/")[2];
115
+            String payload = new String(message.getPayload());
116
+            
117
+            log.info("收到设备遥测数据: deviceSn={}, payload={}", deviceSn, payload);
118
+            
119
+            // 解析遥测数据
120
+            @SuppressWarnings("unchecked")
121
+            Map<String, Object> telemetryData = objectMapper.readValue(payload, Map.class);
122
+            
123
+            // 更新设备最后上报时间
124
+            deviceService.updateLastReportTime(deviceSn);
125
+            
126
+            // TODO: 可以进一步处理遥测数据,如存储到时序数据库或触发告警
127
+            
128
+        } catch (Exception e) {
129
+            log.error("处理设备遥测数据失败", e);
130
+        }
131
+    }
132
+
133
+    /**
134
+     * 发布消息
135
+     */
136
+    public boolean publish(String topic, String payload) {
137
+        try {
138
+            if (!connected || !mqttClient.isConnected()) {
139
+                log.warn("MQTT 客户端未连接,无法发布消息");
140
+                return false;
141
+            }
142
+
143
+            MqttMessage message = new MqttMessage(payload.getBytes());
144
+            message.setQos(1);
145
+            message.setRetained(false);
146
+            
147
+            mqttClient.publish(topic, message);
148
+            log.info("发布消息成功: topic={}, payload={}", topic, payload);
149
+            return true;
150
+        } catch (Exception e) {
151
+            log.error("发布消息失败", e);
152
+            return false;
153
+        }
154
+    }
155
+
156
+    /**
157
+     * 发布指令到设备
158
+     */
159
+    public boolean sendCommandToDevice(String deviceSn, String command) {
160
+        String topic = mqttConfig.getPublishTopicPrefix() + "command/" + deviceSn;
161
+        return publish(topic, command);
162
+    }
163
+
164
+    /**
165
+     * 连接丢失
166
+     */
167
+    @Override
168
+    public void connectionLost(Throwable cause) {
169
+        connected = false;
170
+        log.error("MQTT 连接丢失", cause);
171
+    }
172
+
173
+    /**
174
+     * 消息送达
175
+     */
176
+    @Override
177
+    public void deliveryComplete(IMqttDeliveryToken token) {
178
+        // 消息送达回调
179
+    }
180
+
181
+    /**
182
+     * 消息到达
183
+     */
184
+    @Override
185
+    public void messageArrived(String topic, MqttMessage message) throws Exception {
186
+        // 已在上面处理
187
+    }
188
+
189
+    /**
190
+     * 销毁
191
+     */
192
+    @PreDestroy
193
+    public void destroy() {
194
+        try {
195
+            if (mqttClient != null && mqttClient.isConnected()) {
196
+                mqttClient.disconnect();
197
+                mqttClient.close();
198
+                log.info("MQTT 客户端已断开连接");
199
+            }
200
+        } catch (Exception e) {
201
+            log.error("关闭 MQTT 客户端失败", e);
202
+        }
203
+    }
204
+
205
+    /**
206
+     * 获取连接状态
207
+     */
208
+    public boolean isConnected() {
209
+        return connected && mqttClient != null && mqttClient.isConnected();
210
+    }
211
+}

+ 15
- 0
wm-iot/src/main/resources/application.yml Dosyayı Görüntüle

@@ -20,3 +20,18 @@ spring:
20 20
     nacos:
21 21
       discovery:
22 22
         server-addr: ${NACOS_HOST:127.0.0.1}:8848
23
+
24
+# MQTT 配置
25
+mqtt:
26
+  broker-url: tcp://${MQTT_HOST:127.0.0.1}:1883
27
+  client-id: water-iot-client
28
+  username: ${MQTT_USERNAME:admin}
29
+  password: ${MQTT_PASSWORD:password}
30
+  connection-timeout: 10
31
+  keep-alive-interval: 20
32
+  automatic-reconnect: true
33
+  clean-session: true
34
+  publish-topic-prefix: water/iot/
35
+  subscribe-topic-prefix: water/iot/
36
+  device-status-topic: device/status/+
37
+  device-telemetry-topic: device/telemetry/+