智慧水务管理系统 - 精河县供水工程综合管理平台

CoapAdapter.java 6.5KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. package com.water.iot.adapter.impl;
  2. import com.water.iot.adapter.AdapterInfo;
  3. import com.water.iot.adapter.AdapterStatus;
  4. import com.water.iot.adapter.DeviceAdapter;
  5. import com.water.iot.model.DeviceCommand;
  6. import com.water.iot.model.DeviceInfo;
  7. import org.eclipse.californium.core.CoapHandler;
  8. import org.eclipse.californium.core.CoapObserveRelation;
  9. import org.eclipse.californium.core.CoapResource;
  10. import org.eclipse.californium.core.CoapServer;
  11. import org.eclipse.californium.core.coap.CoAP;
  12. import org.eclipse.californium.core.coap.Response;
  13. import org.eclipse.californium.core.server.resources.CoapExchange;
  14. import org.eclipse.californium.elements.config.Configuration;
  15. import org.eclipse.californium.elements.util.NamedThreadFactory;
  16. import java.util.HashMap;
  17. import java.util.Map;
  18. import java.util.concurrent.ExecutorService;
  19. import java.util.concurrent.Executors;
  20. /**
  21. * CoAP 协议适配器
  22. */
  23. public class CoapAdapter implements DeviceAdapter {
  24. private String host;
  25. private int port;
  26. private AdapterStatus status = AdapterStatus.DISCONNECTED;
  27. private long connectionTime;
  28. private CoapServer coapServer;
  29. private ExecutorService executor;
  30. private Map<String, DeviceInfo> connectedDevices = new HashMap<>();
  31. public CoapAdapter(String host, int port) {
  32. this.host = host;
  33. this.port = port;
  34. }
  35. @Override
  36. public String getProtocol() {
  37. return "coap";
  38. }
  39. @Override
  40. public void onMessage(byte[] payload) {
  41. System.out.println("CoAP 接收到设备数据: " + new String(payload));
  42. try {
  43. // 解析JSON格式数据
  44. String message = new String(payload);
  45. System.out.println("CoAP消息内容: " + message);
  46. // 这里应该解析JSON并提取设备信息
  47. // 模拟处理过程
  48. if (message.contains("deviceSn")) {
  49. processDeviceMessage(message);
  50. }
  51. } catch (Exception e) {
  52. System.err.println("处理CoAP消息时出错: " + e.getMessage());
  53. }
  54. }
  55. private void processDeviceMessage(String message) {
  56. System.out.println("处理CoAP设备消息: " + message);
  57. // 解析设备消息并更新设备状态
  58. // 在实际实现中,这里应该解析JSON并提取字段
  59. DeviceInfo device = new DeviceInfo("COAP_001", "CoAP传感器", "sensor");
  60. device.setLastSeen(System.currentTimeMillis());
  61. Map<String, Object> metrics = new HashMap<>();
  62. metrics.put("temperature", 25.5);
  63. metrics.put("humidity", 60.2);
  64. device.setMetrics(metrics);
  65. connectedDevices.put(device.getDeviceSn(), device);
  66. System.out.println("更新CoAP设备状态: " + device.getDeviceSn());
  67. }
  68. @Override
  69. public void sendCommand(String deviceSn, DeviceCommand cmd) {
  70. System.out.println("发送CoAP命令到设备 " + deviceSn + ": " + cmd.getCommandType());
  71. // 在实际实现中,这里应该使用CoAP客户端发送命令
  72. // 这里模拟发送过程
  73. String command = String.format("{\"command\":\"%s\",\"parameter\":\"%s\",\"value\":%s}",
  74. cmd.getCommandType(), cmd.getParameterKey(), cmd.getParameterValue());
  75. System.out.println("CoAP命令内容: " + command);
  76. }
  77. @Override
  78. public DeviceInfo parseDeviceInfo(byte[] payload) {
  79. System.out.println("解析CoAP设备信息: " + new String(payload));
  80. // 解析CoAP设备信息
  81. DeviceInfo deviceInfo = new DeviceInfo("COAP_SENSOR_001", "CoAP传感器设备", "water_quality");
  82. deviceInfo.setManufacturer("Sensirion");
  83. deviceInfo.setProtocolVersion("CoAP 1.1");
  84. // 解析设备属性
  85. Map<String, Object> properties = new HashMap<>();
  86. properties.put("endpoint", String.format("coap://%s:%d/%s", host, port, deviceInfo.getDeviceSn()));
  87. properties.put("observable", true);
  88. deviceInfo.setProperties(properties);
  89. return deviceInfo;
  90. }
  91. @Override
  92. public AdapterStatus getStatus(String deviceSn) {
  93. return status;
  94. }
  95. @Override
  96. public boolean connect() {
  97. try {
  98. // 创建CoAP服务器
  99. coapServer = new CoapServer();
  100. Configuration config = Configuration.getStandard();
  101. executor = Executors.newFixedThreadPool(
  102. config.getInt(CoapServer.COAP_SERVER_THREADS),
  103. new NamedThreadFactory("CoapServerThread"));
  104. // 添加资源
  105. coapServer.add(new DeviceResource());
  106. coapServer.start();
  107. status = AdapterStatus.CONNECTED;
  108. connectionTime = System.currentTimeMillis();
  109. System.out.println("CoAP 适配器启动成功: " + host + ":" + port);
  110. return true;
  111. } catch (Exception e) {
  112. status = AdapterStatus.ERROR;
  113. System.err.println("CoAP适配器启动失败: " + e.getMessage());
  114. return false;
  115. }
  116. }
  117. @Override
  118. public void disconnect() {
  119. if (coapServer != null) {
  120. coapServer.destroy();
  121. coapServer = null;
  122. }
  123. if (executor != null) {
  124. executor.shutdown();
  125. }
  126. status = AdapterStatus.DISCONNECTED;
  127. connectedDevices.clear();
  128. System.out.println("CoAP 适配器已关闭");
  129. }
  130. @Override
  131. public AdapterInfo getAdapterInfo() {
  132. return new AdapterInfo("CoAP适配器", "coap", "1.0", "支持CoAP协议的设备适配");
  133. }
  134. /**
  135. * 设备资源类
  136. */
  137. private class DeviceResource extends CoapResource {
  138. public DeviceResource() {
  139. super("devices");
  140. setObservable(true);
  141. getAttributes().setObservable();
  142. }
  143. @Override
  144. public void handleGET(CoapExchange exchange) {
  145. Response response = new Response(CoAP.ResponseCode.CONTENT);
  146. response.setPayload("{\"status\":\"ok\",\"message\":\"设备列表\"}");
  147. exchange.respond(response);
  148. }
  149. @Override
  150. public void handlePOST(CoapExchange exchange) {
  151. // 处理设备上报的数据
  152. byte[] payload = exchange.getRequestPayload();
  153. onMessage(payload);
  154. Response response = new Response(CoAP.ResponseCode.CHANGED);
  155. response.setPayload("{\"status\":\"received\"}");
  156. exchange.respond(response);
  157. }
  158. }
  159. }