package com.water.iot.adapter.impl; import com.water.iot.adapter.AdapterInfo; import com.water.iot.adapter.AdapterStatus; import com.water.iot.adapter.DeviceAdapter; import com.water.iot.model.DeviceCommand; import com.water.iot.model.DeviceInfo; import org.eclipse.californium.core.CoapHandler; import org.eclipse.californium.core.CoapObserveRelation; import org.eclipse.californium.core.CoapResource; import org.eclipse.californium.core.CoapServer; import org.eclipse.californium.core.coap.CoAP; import org.eclipse.californium.core.coap.Response; import org.eclipse.californium.core.server.resources.CoapExchange; import org.eclipse.californium.elements.config.Configuration; import org.eclipse.californium.elements.util.NamedThreadFactory; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * CoAP 协议适配器 */ public class CoapAdapter implements DeviceAdapter { private String host; private int port; private AdapterStatus status = AdapterStatus.DISCONNECTED; private long connectionTime; private CoapServer coapServer; private ExecutorService executor; private Map connectedDevices = new HashMap<>(); public CoapAdapter(String host, int port) { this.host = host; this.port = port; } @Override public String getProtocol() { return "coap"; } @Override public void onMessage(byte[] payload) { System.out.println("CoAP 接收到设备数据: " + new String(payload)); try { // 解析JSON格式数据 String message = new String(payload); System.out.println("CoAP消息内容: " + message); // 这里应该解析JSON并提取设备信息 // 模拟处理过程 if (message.contains("deviceSn")) { processDeviceMessage(message); } } catch (Exception e) { System.err.println("处理CoAP消息时出错: " + e.getMessage()); } } private void processDeviceMessage(String message) { System.out.println("处理CoAP设备消息: " + message); // 解析设备消息并更新设备状态 // 在实际实现中,这里应该解析JSON并提取字段 DeviceInfo device = new DeviceInfo("COAP_001", "CoAP传感器", "sensor"); device.setLastSeen(System.currentTimeMillis()); Map metrics = new HashMap<>(); metrics.put("temperature", 25.5); metrics.put("humidity", 60.2); device.setMetrics(metrics); connectedDevices.put(device.getDeviceSn(), device); System.out.println("更新CoAP设备状态: " + device.getDeviceSn()); } @Override public void sendCommand(String deviceSn, DeviceCommand cmd) { System.out.println("发送CoAP命令到设备 " + deviceSn + ": " + cmd.getCommandType()); // 在实际实现中,这里应该使用CoAP客户端发送命令 // 这里模拟发送过程 String command = String.format("{\"command\":\"%s\",\"parameter\":\"%s\",\"value\":%s}", cmd.getCommandType(), cmd.getParameterKey(), cmd.getParameterValue()); System.out.println("CoAP命令内容: " + command); } @Override public DeviceInfo parseDeviceInfo(byte[] payload) { System.out.println("解析CoAP设备信息: " + new String(payload)); // 解析CoAP设备信息 DeviceInfo deviceInfo = new DeviceInfo("COAP_SENSOR_001", "CoAP传感器设备", "water_quality"); deviceInfo.setManufacturer("Sensirion"); deviceInfo.setProtocolVersion("CoAP 1.1"); // 解析设备属性 Map properties = new HashMap<>(); properties.put("endpoint", String.format("coap://%s:%d/%s", host, port, deviceInfo.getDeviceSn())); properties.put("observable", true); deviceInfo.setProperties(properties); return deviceInfo; } @Override public AdapterStatus getStatus(String deviceSn) { return status; } @Override public boolean connect() { try { // 创建CoAP服务器 coapServer = new CoapServer(); Configuration config = Configuration.getStandard(); executor = Executors.newFixedThreadPool( config.getInt(CoapServer.COAP_SERVER_THREADS), new NamedThreadFactory("CoapServerThread")); // 添加资源 coapServer.add(new DeviceResource()); coapServer.start(); status = AdapterStatus.CONNECTED; connectionTime = System.currentTimeMillis(); System.out.println("CoAP 适配器启动成功: " + host + ":" + port); return true; } catch (Exception e) { status = AdapterStatus.ERROR; System.err.println("CoAP适配器启动失败: " + e.getMessage()); return false; } } @Override public void disconnect() { if (coapServer != null) { coapServer.destroy(); coapServer = null; } if (executor != null) { executor.shutdown(); } status = AdapterStatus.DISCONNECTED; connectedDevices.clear(); System.out.println("CoAP 适配器已关闭"); } @Override public AdapterInfo getAdapterInfo() { return new AdapterInfo("CoAP适配器", "coap", "1.0", "支持CoAP协议的设备适配"); } /** * 设备资源类 */ private class DeviceResource extends CoapResource { public DeviceResource() { super("devices"); setObservable(true); getAttributes().setObservable(); } @Override public void handleGET(CoapExchange exchange) { Response response = new Response(CoAP.ResponseCode.CONTENT); response.setPayload("{\"status\":\"ok\",\"message\":\"设备列表\"}"); exchange.respond(response); } @Override public void handlePOST(CoapExchange exchange) { // 处理设备上报的数据 byte[] payload = exchange.getRequestPayload(); onMessage(payload); Response response = new Response(CoAP.ResponseCode.CHANGED); response.setPayload("{\"status\":\"received\"}"); exchange.respond(response); } } }