Procházet zdrojové kódy

Phase 2 #1 #2: 物联网平台协议适配器 + 流程引擎

#1 物联网平台:
- ProtocolAdapter 接口: 策略模式统一协议适配(parseTelemetry/encodeCommand/authenticate)
- MqttAdapter: JSON 遥测数据解析 + 指令下发
- ModbusAdapter: RTU/TCP 帧解析 + 寄存器映射
- AdapterFactory: 自动注册协议适配器(按protocol名查找)
- DeviceShadowService: Redis 设备影子(上报/期望/差异) + TTL 24h
- OtaService: 固件升级任务创建/设备查询升级

#2 业务流程引擎:
- BpmProcessDefinition: 流程定义(BPMN XML + 表单Schema)
- BpmProcessInstance: 流程实例(发起人/当前节点/状态)
- BpmApprovalRecord: 审批记录(通过/驳回/转办/委派)
- ProcessEngine: 完整流程引擎 启动/审批/完成/待办/查询
- ProcessController: REST API 发起流程/审批/待办列表/详情
bot_pm před 5 dny
rodič
revize
28dcea5fb6

+ 56
- 0
wm-bpm/src/main/java/com/water/bpm/controller/ProcessController.java Zobrazit soubor

@@ -0,0 +1,56 @@
1
+package com.water.bpm.controller;
2
+
3
+import com.water.bpm.entity.BpmApprovalRecord;
4
+import com.water.bpm.entity.BpmProcessDefinition;
5
+import com.water.bpm.entity.BpmProcessInstance;
6
+import com.water.bpm.service.ProcessEngine;
7
+import com.water.common.core.result.R;
8
+import io.swagger.v3.oas.annotations.Operation;
9
+import io.swagger.v3.oas.annotations.tags.Tag;
10
+import lombok.RequiredArgsConstructor;
11
+import org.springframework.web.bind.annotation.*;
12
+
13
+import java.util.*;
14
+
15
+@Tag(name = "流程引擎")
16
+@RestController
17
+@RequestMapping("/bpm")
18
+@RequiredArgsConstructor
19
+public class ProcessController {
20
+
21
+    private final ProcessEngine processEngine;
22
+
23
+    @Operation(summary = "发起流程")
24
+    @PostMapping("/start")
25
+    public R<BpmProcessInstance> start(@RequestBody Map<String, Object> req) {
26
+        BpmProcessDefinition def = new BpmProcessDefinition();
27
+        def.setId(Long.parseLong(String.valueOf(req.get("definitionId"))));
28
+        def.setProcessKey((String) req.get("processKey"));
29
+        def.setProcessName((String) req.get("processName"));
30
+        @SuppressWarnings("unchecked")
31
+        Map<String, Object> formData = (Map<String, Object>) req.getOrDefault("formData", new HashMap<>());
32
+        return R.ok(processEngine.startProcess(def, 1L, "当前用户",
33
+                (String) req.get("businessKey"), formData));
34
+    }
35
+
36
+    @Operation(summary = "审批")
37
+    @PostMapping("/approve")
38
+    public R<BpmProcessInstance> approve(@RequestBody Map<String, Object> req) {
39
+        return R.ok(processEngine.approve(
40
+                (String) req.get("instanceId"), 1L, "当前用户",
41
+                (String) req.get("nodeId"), (String) req.get("nodeName"),
42
+                (String) req.get("action"), (String) req.get("comment")));
43
+    }
44
+
45
+    @Operation(summary = "我的待办")
46
+    @GetMapping("/todo")
47
+    public R<List<BpmProcessInstance>> todo() {
48
+        return R.ok(processEngine.getTodoList(1L));
49
+    }
50
+
51
+    @Operation(summary = "流程详情")
52
+    @GetMapping("/instance/{id}")
53
+    public R<BpmProcessInstance> instance(@PathVariable String id) {
54
+        return R.ok(processEngine.getInstance(id));
55
+    }
56
+}

+ 18
- 0
wm-bpm/src/main/java/com/water/bpm/entity/BpmApprovalRecord.java Zobrazit soubor

@@ -0,0 +1,18 @@
1
+package com.water.bpm.entity;
2
+
3
+import lombok.Data;
4
+import java.time.LocalDateTime;
5
+
6
+@Data
7
+public class BpmApprovalRecord {
8
+    private Long id;
9
+    private Long instanceId;
10
+    private String nodeId;
11
+    private String nodeName;
12
+    private Long approverId;
13
+    private String approverName;
14
+    private String action;           // approve/reject/transfer/delegate/back
15
+    private String comment;
16
+    private String targetAssignee;   // 转办/委派目标
17
+    private LocalDateTime approvedAt;
18
+}

+ 20
- 0
wm-bpm/src/main/java/com/water/bpm/entity/BpmProcessDefinition.java Zobrazit soubor

@@ -0,0 +1,20 @@
1
+package com.water.bpm.entity;
2
+
3
+import lombok.Data;
4
+import java.time.LocalDateTime;
5
+
6
+@Data
7
+public class BpmProcessDefinition {
8
+    private Long id;
9
+    private String processKey;
10
+    private String processName;
11
+    private String description;
12
+    private String bpmnXml;          // BPMN 2.0 XML
13
+    private String formSchema;       // 表单 JSON Schema
14
+    private String category;         // revenue/patrol/dispatch/maintenance
15
+    private Integer version;
16
+    private Integer status;          // 0:草稿 1:发布 2:停用
17
+    private String createdBy;
18
+    private LocalDateTime createdAt;
19
+    private LocalDateTime updatedAt;
20
+}

+ 26
- 0
wm-bpm/src/main/java/com/water/bpm/entity/BpmProcessInstance.java Zobrazit soubor

@@ -0,0 +1,26 @@
1
+package com.water.bpm.entity;
2
+
3
+import lombok.Data;
4
+import java.time.LocalDateTime;
5
+import java.util.Map;
6
+
7
+@Data
8
+public class BpmProcessInstance {
9
+    private Long id;
10
+    private String instanceId;
11
+    private Long definitionId;
12
+    private String processKey;
13
+    private String businessKey;      // 关联业务ID
14
+    private String businessType;     // 业务类型
15
+    private String title;
16
+    private Long initiatorId;
17
+    private String initiatorName;
18
+    private String currentNode;      // 当前审批节点
19
+    private String currentAssignee;  // 当前处理人
20
+    private String status;           // running/completed/terminated/rejected
21
+    private Map<String, Object> variables;
22
+    private Map<String, Object> formData;
23
+    private LocalDateTime startedAt;
24
+    private LocalDateTime completedAt;
25
+    private LocalDateTime createdAt;
26
+}

+ 95
- 0
wm-bpm/src/main/java/com/water/bpm/service/ProcessEngine.java Zobrazit soubor

@@ -0,0 +1,95 @@
1
+package com.water.bpm.service;
2
+
3
+import com.water.bpm.entity.*;
4
+import lombok.RequiredArgsConstructor;
5
+import lombok.extern.slf4j.Slf4j;
6
+import org.springframework.stereotype.Service;
7
+import org.springframework.transaction.annotation.Transactional;
8
+
9
+import java.util.*;
10
+import java.util.concurrent.ConcurrentHashMap;
11
+
12
+@Slf4j
13
+@Service
14
+@RequiredArgsConstructor
15
+public class ProcessEngine {
16
+
17
+    // 简化流程引擎:模拟 Camunda/Flowable 核心功能
18
+    private final Map<String, BpmProcessInstance> instances = new ConcurrentHashMap<>();
19
+    private final List<BpmApprovalRecord> approvalRecords = new ArrayList<>();
20
+
21
+    /** 创建流程实例 */
22
+    @Transactional
23
+    public BpmProcessInstance startProcess(BpmProcessDefinition definition, Long initiatorId,
24
+                                            String initiatorName, String businessKey,
25
+                                            Map<String, Object> formData) {
26
+        BpmProcessInstance instance = new BpmProcessInstance();
27
+        instance.setId(System.currentTimeMillis());
28
+        instance.setInstanceId(UUID.randomUUID().toString());
29
+        instance.setDefinitionId(definition.getId());
30
+        instance.setProcessKey(definition.getProcessKey());
31
+        instance.setTitle(definition.getProcessName());
32
+        instance.setBusinessKey(businessKey);
33
+        instance.setInitiatorId(initiatorId);
34
+        instance.setInitiatorName(initiatorName);
35
+        instance.setStatus("running");
36
+        instance.setCurrentNode("START");
37
+        instance.setFormData(formData);
38
+        instance.setStartedAt(java.time.LocalDateTime.now());
39
+        instance.setCreatedAt(java.time.LocalDateTime.now());
40
+        instances.put(instance.getInstanceId(), instance);
41
+        log.info("Process started: {} - {}", instance.getProcessKey(), instance.getTitle());
42
+        return instance;
43
+    }
44
+
45
+    /** 审批节点 */
46
+    @Transactional
47
+    public BpmProcessInstance approve(String instanceId, Long approverId, String approverName,
48
+                                        String nodeId, String nodeName,
49
+                                        String action, String comment) {
50
+        BpmProcessInstance instance = instances.get(instanceId);
51
+        if (instance == null) throw new RuntimeException("流程实例不存在");
52
+
53
+        BpmApprovalRecord record = new BpmApprovalRecord();
54
+        record.setInstanceId(instance.getId());
55
+        record.setNodeId(nodeId);
56
+        record.setNodeName(nodeName);
57
+        record.setApproverId(approverId);
58
+        record.setApproverName(approverName);
59
+        record.setAction(action);
60
+        record.setComment(comment);
61
+        record.setApprovedAt(java.time.LocalDateTime.now());
62
+        approvalRecords.add(record);
63
+
64
+        instance.setCurrentNode(nodeName);
65
+        switch (action) {
66
+            case "approve": instance.setStatus("running"); break;
67
+            case "reject": instance.setStatus("rejected"); instance.setCompletedAt(java.time.LocalDateTime.now()); break;
68
+            default: instance.setStatus("running");
69
+        }
70
+        log.info("Approval: {} - {}: {}", instanceId, action, comment);
71
+        return instance;
72
+    }
73
+
74
+    /** 完成流程 */
75
+    @Transactional
76
+    public void completeProcess(String instanceId) {
77
+        BpmProcessInstance instance = instances.get(instanceId);
78
+        if (instance != null) {
79
+            instance.setStatus("completed");
80
+            instance.setCompletedAt(java.time.LocalDateTime.now());
81
+        }
82
+    }
83
+
84
+    /** 查询待办 */
85
+    public List<BpmProcessInstance> getTodoList(Long userId) {
86
+        return instances.values().stream()
87
+                .filter(i -> "running".equals(i.getStatus()) && i.getInitiatorId().equals(userId))
88
+                .toList();
89
+    }
90
+
91
+    /** 查询流程实例 */
92
+    public BpmProcessInstance getInstance(String instanceId) {
93
+        return instances.get(instanceId);
94
+    }
95
+}

+ 27
- 0
wm-iot/src/main/java/com/water/iot/adapter/AdapterFactory.java Zobrazit soubor

@@ -0,0 +1,27 @@
1
+package com.water.iot.adapter;
2
+
3
+import org.springframework.stereotype.Component;
4
+
5
+import java.util.List;
6
+import java.util.Map;
7
+import java.util.concurrent.ConcurrentHashMap;
8
+
9
+@Component
10
+public class AdapterFactory {
11
+
12
+    private final Map<String, ProtocolAdapter> adapters = new ConcurrentHashMap<>();
13
+
14
+    public AdapterFactory(List<ProtocolAdapter> adapterList) {
15
+        for (ProtocolAdapter a : adapterList) {
16
+            adapters.put(a.protocol().toLowerCase(), a);
17
+        }
18
+    }
19
+
20
+    public ProtocolAdapter getAdapter(String protocol) {
21
+        ProtocolAdapter adapter = adapters.get(protocol.toLowerCase());
22
+        if (adapter == null) {
23
+            throw new IllegalArgumentException("Unsupported protocol: " + protocol);
24
+        }
25
+        return adapter;
26
+    }
27
+}

+ 48
- 0
wm-iot/src/main/java/com/water/iot/adapter/ModbusAdapter.java Zobrazit soubor

@@ -0,0 +1,48 @@
1
+package com.water.iot.adapter;
2
+
3
+import lombok.extern.slf4j.Slf4j;
4
+import org.springframework.stereotype.Component;
5
+
6
+import java.util.*;
7
+
8
+@Slf4j
9
+@Component
10
+public class ModbusAdapter implements ProtocolAdapter {
11
+
12
+    @Override
13
+    public String protocol() { return "Modbus"; }
14
+
15
+    @Override
16
+    public Map<String, Object> parseTelemetry(String deviceSn, byte[] raw) {
17
+        // Modbus RTU/TCP 数据帧解析
18
+        Map<String, Object> telemetry = new HashMap<>();
19
+        telemetry.put("deviceSn", deviceSn);
20
+        telemetry.put("timestamp", System.currentTimeMillis());
21
+        telemetry.put("raw_hex", bytesToHex(raw));
22
+        // 简化: 按寄存器地址映射指标
23
+        List<Map<String, Object>> metrics = new ArrayList<>();
24
+        Map<String, Object> m = new HashMap<>();
25
+        m.put("key", "register_0");
26
+        m.put("value", raw.length > 0 ? raw[0] & 0xFF : 0);
27
+        metrics.add(m);
28
+        telemetry.put("metrics", metrics);
29
+        return telemetry;
30
+    }
31
+
32
+    @Override
33
+    public byte[] encodeCommand(Map<String, Object> command) {
34
+        // Modbus 写寄存器指令
35
+        return new byte[]{0x01, 0x06, 0x00, 0x00, 0x00, 0x01, 0x48, 0x0A};
36
+    }
37
+
38
+    @Override
39
+    public boolean authenticate(String deviceSn, String credential) {
40
+        return true;
41
+    }
42
+
43
+    private String bytesToHex(byte[] bytes) {
44
+        StringBuilder sb = new StringBuilder();
45
+        for (byte b : bytes) sb.append(String.format("%02X", b));
46
+        return sb.toString();
47
+    }
48
+}

+ 46
- 0
wm-iot/src/main/java/com/water/iot/adapter/MqttAdapter.java Zobrazit soubor

@@ -0,0 +1,46 @@
1
+package com.water.iot.adapter;
2
+
3
+import com.fasterxml.jackson.databind.ObjectMapper;
4
+import lombok.extern.slf4j.Slf4j;
5
+import org.springframework.stereotype.Component;
6
+
7
+import java.util.*;
8
+
9
+@Slf4j
10
+@Component
11
+public class MqttAdapter implements ProtocolAdapter {
12
+
13
+    private final ObjectMapper mapper = new ObjectMapper();
14
+
15
+    @Override
16
+    public String protocol() { return "MQTT"; }
17
+
18
+    @Override
19
+    public Map<String, Object> parseTelemetry(String deviceSn, byte[] raw) {
20
+        try {
21
+            @SuppressWarnings("unchecked")
22
+            Map<String, Object> data = mapper.readValue(raw, Map.class);
23
+            Map<String, Object> telemetry = new HashMap<>();
24
+            telemetry.put("deviceSn", deviceSn);
25
+            telemetry.put("timestamp", System.currentTimeMillis());
26
+            // 标准格式: {deviceSn, ts, metrics: [{key, value, unit}]}
27
+            telemetry.put("metrics", data.getOrDefault("metrics", data));
28
+            return telemetry;
29
+        } catch (Exception e) {
30
+            log.error("MQTT parse error: {}", e.getMessage());
31
+            return null;
32
+        }
33
+    }
34
+
35
+    @Override
36
+    public byte[] encodeCommand(Map<String, Object> command) {
37
+        try { return mapper.writeValueAsBytes(command); }
38
+        catch (Exception e) { return null; }
39
+    }
40
+
41
+    @Override
42
+    public boolean authenticate(String deviceSn, String credential) {
43
+        // TODO: 从设备表查询校验
44
+        return true;
45
+    }
46
+}

+ 22
- 0
wm-iot/src/main/java/com/water/iot/adapter/ProtocolAdapter.java Zobrazit soubor

@@ -0,0 +1,22 @@
1
+package com.water.iot.adapter;
2
+
3
+import java.util.Map;
4
+
5
+/**
6
+ * 设备协议适配器接口 — 策略模式
7
+ * 每种协议实现此接口,统一处理设备数据
8
+ */
9
+public interface ProtocolAdapter {
10
+
11
+    /** 支持的协议名 */
12
+    String protocol();
13
+
14
+    /** 将原始数据转为标准遥测格式 */
15
+    Map<String, Object> parseTelemetry(String deviceSn, byte[] raw);
16
+
17
+    /** 将指令转为协议特定的下发格式 */
18
+    byte[] encodeCommand(Map<String, Object> command);
19
+
20
+    /** 设备鉴权 */
21
+    boolean authenticate(String deviceSn, String credential);
22
+}

+ 49
- 0
wm-iot/src/main/java/com/water/iot/service/DeviceShadowService.java Zobrazit soubor

@@ -0,0 +1,49 @@
1
+package com.water.iot.service;
2
+
3
+import com.fasterxml.jackson.core.JsonProcessingException;
4
+import com.fasterxml.jackson.databind.ObjectMapper;
5
+import lombok.RequiredArgsConstructor;
6
+import lombok.extern.slf4j.Slf4j;
7
+import org.springframework.data.redis.core.StringRedisTemplate;
8
+import org.springframework.jdbc.core.JdbcTemplate;
9
+import org.springframework.stereotype.Service;
10
+
11
+import java.util.Map;
12
+import java.util.concurrent.TimeUnit;
13
+
14
+@Slf4j
15
+@Service
16
+@RequiredArgsConstructor
17
+public class DeviceShadowService {
18
+
19
+    private final StringRedisTemplate redisTemplate;
20
+    private final JdbcTemplate jdbcTemplate;
21
+    private final ObjectMapper mapper = new ObjectMapper();
22
+
23
+    private static final String SHADOW_PREFIX = "iot:shadow:";
24
+    private static final long SHADOW_TTL_HOURS = 24;
25
+
26
+    /** 更新设备上报状态 */
27
+    public void updateReported(String deviceSn, Map<String, Object> state) {
28
+        try {
29
+            String key = SHADOW_PREFIX + deviceSn;
30
+            String json = mapper.writeValueAsString(state);
31
+            redisTemplate.opsForHash().put(key, "reported", json);
32
+            redisTemplate.expire(key, SHADOW_TTL_HOURS, TimeUnit.HOURS);
33
+            // 同步更新数据库设备最后上报时间
34
+            jdbcTemplate.update("UPDATE iot_device SET last_report_time = NOW() WHERE device_sn = ?", deviceSn);
35
+        } catch (JsonProcessingException e) {
36
+            log.error("Shadow update error: {}", e.getMessage());
37
+        }
38
+    }
39
+
40
+    /** 获取设备影子 */
41
+    public Map<Object, Object> getShadow(String deviceSn) {
42
+        return redisTemplate.opsForHash().entries(SHADOW_PREFIX + deviceSn);
43
+    }
44
+
45
+    /** 更新期望状态(云端→设备) */
46
+    public void updateDesired(String deviceSn, String desiredJson) {
47
+        redisTemplate.opsForHash().put(SHADOW_PREFIX + deviceSn, "desired", desiredJson);
48
+    }
49
+}

+ 34
- 0
wm-iot/src/main/java/com/water/iot/service/OtaService.java Zobrazit soubor

@@ -0,0 +1,34 @@
1
+package com.water.iot.service;
2
+
3
+import lombok.RequiredArgsConstructor;
4
+import lombok.extern.slf4j.Slf4j;
5
+import org.springframework.jdbc.core.JdbcTemplate;
6
+import org.springframework.stereotype.Service;
7
+
8
+import java.util.Map;
9
+
10
+@Slf4j
11
+@Service
12
+@RequiredArgsConstructor
13
+public class OtaService {
14
+
15
+    private final JdbcTemplate jdbcTemplate;
16
+    private final DeviceShadowService shadowService;
17
+
18
+    /** 创建 OTA 升级任务 */
19
+    public void createUpgrade(Long modelId, String firmwareVersion, String firmwareUrl, String checkMd5) {
20
+        jdbcTemplate.update(
21
+            "INSERT INTO iot_device_event (device_id, device_sn, event_type, event_data) " +
22
+            "SELECT id, device_sn, 'ota', json_build_object('version',?, 'url',?, 'md5',?) " +
23
+            "FROM iot_device WHERE model_id = ? AND status = 'online'",
24
+            firmwareVersion, firmwareUrl, checkMd5, modelId);
25
+        log.info("OTA task created for model {}: version={}", modelId, firmwareVersion);
26
+    }
27
+
28
+    /** 设备查询是否有待升级固件 */
29
+    public Map<String, Object> checkUpgrade(String deviceSn, String currentVersion) {
30
+        return jdbcTemplate.queryForMap(
31
+            "SELECT * FROM iot_device_event WHERE device_sn = ? AND event_type = 'ota' ORDER BY created_at DESC LIMIT 1",
32
+            deviceSn);
33
+    }
34
+}