| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149 |
- """
- IoT模块简化测试脚本
- 仅测试设备管理器功能,不依赖MQTT
- """
-
- import asyncio
- import json
- import sys
- import os
-
- # 添加项目根目录到Python路径
- sys.path.append(os.path.dirname(os.path.abspath(__file__)))
-
- from src.iot.device_manager import DeviceManager
- from src.iot.models import DeviceType, DeviceStatus
-
-
- async def test_device_manager():
- """测试设备管理器"""
- print("=== 测试设备管理器 ===")
-
- device_manager = DeviceManager()
-
- # 注册设备
- device_data = {
- 'device_sn': 'LL-001',
- 'device_type': 'flow_meter',
- 'name': '流量计-001',
- 'description': 'A区入口流量计',
- 'area': 'A区',
- 'position': '入口处',
- 'manufacturer': '华为',
- 'model': 'LL-100'
- }
-
- device = device_manager.register_device(device_data)
- print(f"注册设备: {device.device_sn} - {device.name}")
-
- # 获取设备
- retrieved_device = device_manager.get_device('LL-001')
- print(f"获取设备: {retrieved_device.name}")
-
- # 更新设备
- updated_device = device_manager.update_device('LL-001', {'status': DeviceStatus.ONLINE})
- print(f"更新设备状态: {updated_device.status}")
-
- # 列出设备
- devices = device_manager.list_devices()
- print(f"设备列表: {len(devices)}个设备")
-
- # 更新设备影子
- device_manager.update_device_shadow('LL-001', {'temperature': 25.5, 'pressure': 0.8})
- shadow = device_manager.get_device_shadow('LL-001')
- print(f"设备影子: {shadow.state}")
-
- # 获取统计信息
- stats = device_manager.get_device_statistics()
- print(f"设备统计: {stats}")
-
- # 测试设备发现
- discovered = device_manager.discover_devices()
- print(f"发现设备: {len(discovered)}个设备")
-
- print("设备管理器测试完成\n")
-
-
- async def test_device_filtering():
- """测试设备过滤功能"""
- print("=== 测试设备过滤 ===")
-
- device_manager = DeviceManager()
-
- # 注册多个设备
- test_devices = [
- {'device_sn': 'LL-001', 'device_type': 'flow_meter', 'name': '流量计-001', 'area': 'A区'},
- {'device_sn': 'YL-001', 'device_type': 'pressure_meter', 'name': '压力表-001', 'area': 'A区'},
- {'device_sn': 'SW-001', 'device_type': 'level_meter', 'name': '水位计-001', 'area': 'B区'},
- {'device_sn': 'LL-002', 'device_type': 'flow_meter', 'name': '流量计-002', 'area': 'B区'},
- ]
-
- for device_data in test_devices:
- device_manager.register_device(device_data)
-
- # 测试按类型过滤
- flow_meters = device_manager.list_devices(device_type=DeviceType.FLOW_METER)
- print(f"流量计数量: {len(flow_meters)}")
-
- # 测试按区域过滤
- a_zone_devices = device_manager.list_devices(area='A区')
- print(f"A区设备数量: {len(a_zone_devices)}")
-
- # 测试按状态过滤
- online_devices = device_manager.list_devices(status=DeviceStatus.ONLINE)
- print(f"在线设备数量: {len(online_devices)}")
-
- print("设备过滤测试完成\n")
-
-
- async def test_device_shadow():
- """测试设备影子功能"""
- print("=== 测试设备影子 ===")
-
- device_manager = DeviceManager()
-
- # 注册设备
- device_data = {
- 'device_sn': 'LL-001',
- 'device_type': 'flow_meter',
- 'name': '流量计-001'
- }
- device_manager.register_device(device_data)
-
- # 更新设备影子
- shadow_data = {
- 'temperature': 25.5,
- 'pressure': 0.8,
- 'flow_rate': 100.5
- }
-
- success = device_manager.update_device_shadow('LL-001', shadow_data)
- print(f"影子更新成功: {success}")
-
- # 获取设备影子
- shadow = device_manager.get_device_shadow('LL-001')
- print(f"设备影子状态: {shadow.state}")
-
- print("设备影子测试完成\n")
-
-
- async def main():
- """主测试函数"""
- print("开始 IoT 模块简化测试...\n")
-
- try:
- # 测试各个组件
- await test_device_manager()
- await test_device_filtering()
- await test_device_shadow()
-
- print("✅ 所有测试完成!")
-
- except Exception as e:
- print(f"❌ 测试失败: {e}")
- import traceback
- traceback.print_exc()
-
-
- if __name__ == "__main__":
- asyncio.run(main())
|