智慧水务管理系统 - 精河县供水工程综合管理平台

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. """
  2. IoT模块简化测试脚本
  3. 仅测试设备管理器功能,不依赖MQTT
  4. """
  5. import asyncio
  6. import json
  7. import sys
  8. import os
  9. # 添加项目根目录到Python路径
  10. sys.path.append(os.path.dirname(os.path.abspath(__file__)))
  11. from src.iot.device_manager import DeviceManager
  12. from src.iot.models import DeviceType, DeviceStatus
  13. async def test_device_manager():
  14. """测试设备管理器"""
  15. print("=== 测试设备管理器 ===")
  16. device_manager = DeviceManager()
  17. # 注册设备
  18. device_data = {
  19. 'device_sn': 'LL-001',
  20. 'device_type': 'flow_meter',
  21. 'name': '流量计-001',
  22. 'description': 'A区入口流量计',
  23. 'area': 'A区',
  24. 'position': '入口处',
  25. 'manufacturer': '华为',
  26. 'model': 'LL-100'
  27. }
  28. device = device_manager.register_device(device_data)
  29. print(f"注册设备: {device.device_sn} - {device.name}")
  30. # 获取设备
  31. retrieved_device = device_manager.get_device('LL-001')
  32. print(f"获取设备: {retrieved_device.name}")
  33. # 更新设备
  34. updated_device = device_manager.update_device('LL-001', {'status': DeviceStatus.ONLINE})
  35. print(f"更新设备状态: {updated_device.status}")
  36. # 列出设备
  37. devices = device_manager.list_devices()
  38. print(f"设备列表: {len(devices)}个设备")
  39. # 更新设备影子
  40. device_manager.update_device_shadow('LL-001', {'temperature': 25.5, 'pressure': 0.8})
  41. shadow = device_manager.get_device_shadow('LL-001')
  42. print(f"设备影子: {shadow.state}")
  43. # 获取统计信息
  44. stats = device_manager.get_device_statistics()
  45. print(f"设备统计: {stats}")
  46. # 测试设备发现
  47. discovered = device_manager.discover_devices()
  48. print(f"发现设备: {len(discovered)}个设备")
  49. print("设备管理器测试完成\n")
  50. async def test_device_filtering():
  51. """测试设备过滤功能"""
  52. print("=== 测试设备过滤 ===")
  53. device_manager = DeviceManager()
  54. # 注册多个设备
  55. test_devices = [
  56. {'device_sn': 'LL-001', 'device_type': 'flow_meter', 'name': '流量计-001', 'area': 'A区'},
  57. {'device_sn': 'YL-001', 'device_type': 'pressure_meter', 'name': '压力表-001', 'area': 'A区'},
  58. {'device_sn': 'SW-001', 'device_type': 'level_meter', 'name': '水位计-001', 'area': 'B区'},
  59. {'device_sn': 'LL-002', 'device_type': 'flow_meter', 'name': '流量计-002', 'area': 'B区'},
  60. ]
  61. for device_data in test_devices:
  62. device_manager.register_device(device_data)
  63. # 测试按类型过滤
  64. flow_meters = device_manager.list_devices(device_type=DeviceType.FLOW_METER)
  65. print(f"流量计数量: {len(flow_meters)}")
  66. # 测试按区域过滤
  67. a_zone_devices = device_manager.list_devices(area='A区')
  68. print(f"A区设备数量: {len(a_zone_devices)}")
  69. # 测试按状态过滤
  70. online_devices = device_manager.list_devices(status=DeviceStatus.ONLINE)
  71. print(f"在线设备数量: {len(online_devices)}")
  72. print("设备过滤测试完成\n")
  73. async def test_device_shadow():
  74. """测试设备影子功能"""
  75. print("=== 测试设备影子 ===")
  76. device_manager = DeviceManager()
  77. # 注册设备
  78. device_data = {
  79. 'device_sn': 'LL-001',
  80. 'device_type': 'flow_meter',
  81. 'name': '流量计-001'
  82. }
  83. device_manager.register_device(device_data)
  84. # 更新设备影子
  85. shadow_data = {
  86. 'temperature': 25.5,
  87. 'pressure': 0.8,
  88. 'flow_rate': 100.5
  89. }
  90. success = device_manager.update_device_shadow('LL-001', shadow_data)
  91. print(f"影子更新成功: {success}")
  92. # 获取设备影子
  93. shadow = device_manager.get_device_shadow('LL-001')
  94. print(f"设备影子状态: {shadow.state}")
  95. print("设备影子测试完成\n")
  96. async def main():
  97. """主测试函数"""
  98. print("开始 IoT 模块简化测试...\n")
  99. try:
  100. # 测试各个组件
  101. await test_device_manager()
  102. await test_device_filtering()
  103. await test_device_shadow()
  104. print("✅ 所有测试完成!")
  105. except Exception as e:
  106. print(f"❌ 测试失败: {e}")
  107. import traceback
  108. traceback.print_exc()
  109. if __name__ == "__main__":
  110. asyncio.run(main())