""" IoT模块简化测试脚本 仅测试设备管理器功能,不依赖MQTT """ import asyncio import json import sys import os # 添加项目根目录到Python路径 sys.path.append(os.path.dirname(os.path.abspath(__file__))) from src.iot.device_manager import DeviceManager from src.iot.models import DeviceType, DeviceStatus async def test_device_manager(): """测试设备管理器""" print("=== 测试设备管理器 ===") device_manager = DeviceManager() # 注册设备 device_data = { 'device_sn': 'LL-001', 'device_type': 'flow_meter', 'name': '流量计-001', 'description': 'A区入口流量计', 'area': 'A区', 'position': '入口处', 'manufacturer': '华为', 'model': 'LL-100' } device = device_manager.register_device(device_data) print(f"注册设备: {device.device_sn} - {device.name}") # 获取设备 retrieved_device = device_manager.get_device('LL-001') print(f"获取设备: {retrieved_device.name}") # 更新设备 updated_device = device_manager.update_device('LL-001', {'status': DeviceStatus.ONLINE}) print(f"更新设备状态: {updated_device.status}") # 列出设备 devices = device_manager.list_devices() print(f"设备列表: {len(devices)}个设备") # 更新设备影子 device_manager.update_device_shadow('LL-001', {'temperature': 25.5, 'pressure': 0.8}) shadow = device_manager.get_device_shadow('LL-001') print(f"设备影子: {shadow.state}") # 获取统计信息 stats = device_manager.get_device_statistics() print(f"设备统计: {stats}") # 测试设备发现 discovered = device_manager.discover_devices() print(f"发现设备: {len(discovered)}个设备") print("设备管理器测试完成\n") async def test_device_filtering(): """测试设备过滤功能""" print("=== 测试设备过滤 ===") device_manager = DeviceManager() # 注册多个设备 test_devices = [ {'device_sn': 'LL-001', 'device_type': 'flow_meter', 'name': '流量计-001', 'area': 'A区'}, {'device_sn': 'YL-001', 'device_type': 'pressure_meter', 'name': '压力表-001', 'area': 'A区'}, {'device_sn': 'SW-001', 'device_type': 'level_meter', 'name': '水位计-001', 'area': 'B区'}, {'device_sn': 'LL-002', 'device_type': 'flow_meter', 'name': '流量计-002', 'area': 'B区'}, ] for device_data in test_devices: device_manager.register_device(device_data) # 测试按类型过滤 flow_meters = device_manager.list_devices(device_type=DeviceType.FLOW_METER) print(f"流量计数量: {len(flow_meters)}") # 测试按区域过滤 a_zone_devices = device_manager.list_devices(area='A区') print(f"A区设备数量: {len(a_zone_devices)}") # 测试按状态过滤 online_devices = device_manager.list_devices(status=DeviceStatus.ONLINE) print(f"在线设备数量: {len(online_devices)}") print("设备过滤测试完成\n") async def test_device_shadow(): """测试设备影子功能""" print("=== 测试设备影子 ===") device_manager = DeviceManager() # 注册设备 device_data = { 'device_sn': 'LL-001', 'device_type': 'flow_meter', 'name': '流量计-001' } device_manager.register_device(device_data) # 更新设备影子 shadow_data = { 'temperature': 25.5, 'pressure': 0.8, 'flow_rate': 100.5 } success = device_manager.update_device_shadow('LL-001', shadow_data) print(f"影子更新成功: {success}") # 获取设备影子 shadow = device_manager.get_device_shadow('LL-001') print(f"设备影子状态: {shadow.state}") print("设备影子测试完成\n") async def main(): """主测试函数""" print("开始 IoT 模块简化测试...\n") try: # 测试各个组件 await test_device_manager() await test_device_filtering() await test_device_shadow() print("✅ 所有测试完成!") except Exception as e: print(f"❌ 测试失败: {e}") import traceback traceback.print_exc() if __name__ == "__main__": asyncio.run(main())