| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148 |
- """
- IoT模块测试脚本
- 用于验证MQTT适配器、设备管理器和API功能
- """
-
- import asyncio
- import json
- import sys
- import os
-
- # 添加项目根目录到Python路径
- sys.path.append(os.path.dirname(os.path.abspath(__file__)))
-
- from src.iot.device_manager import DeviceManager
- from src.iot.mqtt_adapter import MqttAdapter
- from src.iot.device_controller import DeviceController
- from src.iot.models import DeviceType, DeviceStatus
-
-
- async def test_device_manager():
- """测试设备管理器"""
- print("=== 测试设备管理器 ===")
-
- device_manager = DeviceManager()
-
- # 注册设备
- device_data = {
- 'device_sn': 'LL-001',
- 'device_type': 'flow_meter',
- 'name': '流量计-001',
- 'description': 'A区入口流量计',
- 'area': 'A区',
- 'position': '入口处',
- 'manufacturer': '华为',
- 'model': 'LL-100'
- }
-
- device = device_manager.register_device(device_data)
- print(f"注册设备: {device.device_sn} - {device.name}")
-
- # 获取设备
- retrieved_device = device_manager.get_device('LL-001')
- print(f"获取设备: {retrieved_device.name}")
-
- # 更新设备
- updated_device = device_manager.update_device('LL-001', {'status': DeviceStatus.ONLINE})
- print(f"更新设备状态: {updated_device.status}")
-
- # 列出设备
- devices = device_manager.list_devices()
- print(f"设备列表: {len(devices)}个设备")
-
- # 更新设备影子
- device_manager.update_device_shadow('LL-001', {'temperature': 25.5, 'pressure': 0.8})
- shadow = device_manager.get_device_shadow('LL-001')
- print(f"设备影子: {shadow.state}")
-
- # 获取统计信息
- stats = device_manager.get_device_statistics()
- print(f"设备统计: {stats}")
-
- print("设备管理器测试完成\n")
-
-
- async def test_mqtt_adapter():
- """测试MQTT适配器"""
- print("=== 测试MQTT适配器 ===")
-
- # 创建MQTT适配器(不实际连接)
- mqtt_adapter = MqttAdapter(
- broker_host="localhost",
- broker_port=1883,
- client_id="test-client"
- )
-
- # 测试消息发布
- test_payload = {"message": "Hello IoT", "timestamp": "2024-01-01T00:00:00"}
- success = mqtt_adapter.publish("test/topic", test_payload)
- print(f"消息发布测试: {'成功' if success else '失败'}")
-
- # 测试连接状态
- status = mqtt_adapter.get_connection_status()
- print(f"MQTT状态: {status}")
-
- print("MQTT适配器测试完成\n")
-
-
- async def test_device_controller():
- """测试设备控制器"""
- print("=== 测试设备控制器 ===")
-
- # 创建组件
- device_manager = DeviceManager()
- mqtt_adapter = MqttAdapter()
- device_controller = DeviceController(device_manager, mqtt_adapter)
-
- # 注册一些测试设备
- test_devices = [
- {
- 'device_sn': 'LL-001',
- 'device_type': 'flow_meter',
- 'name': '流量计-001',
- 'area': 'A区',
- 'manufacturer': '华为'
- },
- {
- 'device_sn': 'YL-001',
- 'device_type': 'pressure_meter',
- 'name': '压力表-001',
- 'area': 'B区',
- 'manufacturer': '西门子'
- }
- ]
-
- for device_data in test_devices:
- device_manager.register_device(device_data)
-
- # 模拟API请求
- print("测试设备注册:")
- print(f"已注册设备数量: {len(device_manager.devices)}")
-
- # 模拟设备发现
- discovered = device_manager.discover_devices()
- print(f"发现设备数量: {len(discovered)}")
-
- print("设备控制器测试完成\n")
-
-
- async def main():
- """主测试函数"""
- print("开始 IoT 模块测试...\n")
-
- try:
- # 测试各个组件
- await test_device_manager()
- await test_mqtt_adapter()
- await test_device_controller()
-
- print("✅ 所有测试完成!")
-
- except Exception as e:
- print(f"❌ 测试失败: {e}")
- import traceback
- traceback.print_exc()
-
-
- if __name__ == "__main__":
- asyncio.run(main())
|