| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185 |
- #!/usr/bin/env python3
- """
- 应急推演功能测试脚本
- """
-
- import json
- import requests
- import sys
- from datetime import datetime
-
- # 配置
- BASE_URL = "http://localhost:8080"
- API_TOKEN = "test-token" # 实际使用时替换为真实的token
-
- def test_pipe_burst_simulation():
- """测试爆管模拟功能"""
- print("=== 测试爆管模拟功能 ===")
-
- url = f"{BASE_URL}/api/emergency/dispatch/quick-pipe-burst"
-
- payload = {
- "lng": 116.4074,
- "lat": 39.9042,
- "pipeDiameter": "DN100",
- "operatorName": "test_operator"
- }
-
- headers = {
- "Content-Type": "application/json",
- "Authorization": f"Bearer {API_TOKEN}"
- }
-
- try:
- response = requests.post(url, json=payload, headers=headers, timeout=30)
- if response.status_code == 200:
- result = response.json()
- if result.get("success"):
- print("✅ 爆管模拟测试成功")
- print(f"模拟编号: {result.get('simulation', {}).get('simulationNo')}")
- print(f"影响区域: {result.get('simulation', {}).get('affectedArea')}")
- print(f"受影响用户数: {result.get('simulation', {}).get('affectedCustomers')}")
- return True
- else:
- print(f"❌ 爆管模拟测试失败: {result.get('message')}")
- return False
- else:
- print(f"❌ HTTP错误: {response.status_code}")
- return False
- except Exception as e:
- print(f"❌ 爆管模拟测试异常: {e}")
- return False
-
- def test_water_quality_simulation():
- """测试水质异常模拟功能"""
- print("\n=== 测试水质异常模拟功能 ===")
-
- url = f"{BASE_URL}/api/emergency/dispatch/quick-water-quality"
-
- payload = {
- "area": "市中心区域",
- "pollutant": "重金属",
- "lng": 116.4074,
- "lat": 39.9042,
- "operatorName": "test_operator"
- }
-
- headers = {
- "Content-Type": "application/json",
- "Authorization": f"Bearer {API_TOKEN}"
- }
-
- try:
- response = requests.post(url, json=payload, headers=headers, timeout=30)
- if response.status_code == 200:
- result = response.json()
- if result.get("success"):
- print("✅ 水质异常模拟测试成功")
- print(f"模拟编号: {result.get('simulation', {}).get('simulationNo')}")
- print(f"风险等级: {result.get('simulation', {}).get('riskLevel')}")
- print(f"备用水源: {result.get('simulation', {}).get('backupWaterSource')}")
- return True
- else:
- print(f"❌ 水质异常模拟测试失败: {result.get('message')}")
- return False
- else:
- print(f"❌ HTTP错误: {response.status_code}")
- return False
- except Exception as e:
- print(f"❌ 水质异常模拟测试异常: {e}")
- return False
-
- def test_emergency_plan_list():
- """测试应急预案列表"""
- print("\n=== 测试应急预案列表 ===")
-
- url = f"{BASE_URL}/api/emergency/plan/list"
-
- headers = {
- "Authorization": f"Bearer {API_TOKEN}"
- }
-
- try:
- response = requests.get(url, headers=headers, timeout=30)
- if response.status_code == 200:
- result = response.json()
- if result.get("success"):
- plans = result.get("data", [])
- print(f"✅ 应急预案列表查询成功,共 {len(plans)} 个预案")
- for plan in plans[:3]: # 只显示前3个
- print(f" - {plan.get('planName')} ({plan.get('planNo')})")
- return True
- else:
- print(f"❌ 应急预案列表查询失败: {result.get('message')}")
- return False
- else:
- print(f"❌ HTTP错误: {response.status_code}")
- return False
- except Exception as e:
- print(f"❌ 应急预案列表查询异常: {e}")
- return False
-
- def test_emergency_status():
- """测试应急状态查询"""
- print("\n=== 测试应急状态查询 ===")
-
- url = f"{BASE_URL}/api/emergency/dispatch/status"
-
- headers = {
- "Authorization": f"Bearer {API_TOKEN}"
- }
-
- try:
- response = requests.get(url, headers=headers, timeout=30)
- if response.status_code == 200:
- result = response.json()
- if result.get("success"):
- status = result.get("status", {})
- print("✅ 应急状态查询成功")
- print(f"警报级别: {status.get('alertLevel')}")
- print(f"准备度评分: {status.get('preparednessScore')}")
- print(f"活跃预案数: {len(status.get('activePlans', []))}")
- return True
- else:
- print(f"❌ 应急状态查询失败: {result.get('message')}")
- return False
- else:
- print(f"❌ HTTP错误: {response.status_code}")
- return False
- except Exception as e:
- print(f"❌ 应急状态查询异常: {e}")
- return False
-
- def main():
- """主测试函数"""
- print(f"开始测试应急推演功能 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
- print("=" * 60)
-
- # 执行测试
- tests = [
- test_pipe_burst_simulation,
- test_water_quality_simulation,
- test_emergency_plan_list,
- test_emergency_status
- ]
-
- passed = 0
- total = len(tests)
-
- for test in tests:
- if test():
- passed += 1
-
- # 输出测试结果
- print("\n" + "=" * 60)
- print(f"测试完成: {passed}/{total} 通过")
-
- if passed == total:
- print("🎉 所有测试通过!应急推演功能正常工作。")
- sys.exit(0)
- else:
- print("⚠️ 部分测试失败,请检查功能实现。")
- sys.exit(1)
-
- if __name__ == "__main__":
- main()
|