智慧水务管理系统 - 精河县供水工程综合管理平台

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. #!/usr/bin/env python3
  2. """
  3. 应急推演功能测试脚本
  4. """
  5. import json
  6. import requests
  7. import sys
  8. from datetime import datetime
  9. # 配置
  10. BASE_URL = "http://localhost:8080"
  11. API_TOKEN = "test-token" # 实际使用时替换为真实的token
  12. def test_pipe_burst_simulation():
  13. """测试爆管模拟功能"""
  14. print("=== 测试爆管模拟功能 ===")
  15. url = f"{BASE_URL}/api/emergency/dispatch/quick-pipe-burst"
  16. payload = {
  17. "lng": 116.4074,
  18. "lat": 39.9042,
  19. "pipeDiameter": "DN100",
  20. "operatorName": "test_operator"
  21. }
  22. headers = {
  23. "Content-Type": "application/json",
  24. "Authorization": f"Bearer {API_TOKEN}"
  25. }
  26. try:
  27. response = requests.post(url, json=payload, headers=headers, timeout=30)
  28. if response.status_code == 200:
  29. result = response.json()
  30. if result.get("success"):
  31. print("✅ 爆管模拟测试成功")
  32. print(f"模拟编号: {result.get('simulation', {}).get('simulationNo')}")
  33. print(f"影响区域: {result.get('simulation', {}).get('affectedArea')}")
  34. print(f"受影响用户数: {result.get('simulation', {}).get('affectedCustomers')}")
  35. return True
  36. else:
  37. print(f"❌ 爆管模拟测试失败: {result.get('message')}")
  38. return False
  39. else:
  40. print(f"❌ HTTP错误: {response.status_code}")
  41. return False
  42. except Exception as e:
  43. print(f"❌ 爆管模拟测试异常: {e}")
  44. return False
  45. def test_water_quality_simulation():
  46. """测试水质异常模拟功能"""
  47. print("\n=== 测试水质异常模拟功能 ===")
  48. url = f"{BASE_URL}/api/emergency/dispatch/quick-water-quality"
  49. payload = {
  50. "area": "市中心区域",
  51. "pollutant": "重金属",
  52. "lng": 116.4074,
  53. "lat": 39.9042,
  54. "operatorName": "test_operator"
  55. }
  56. headers = {
  57. "Content-Type": "application/json",
  58. "Authorization": f"Bearer {API_TOKEN}"
  59. }
  60. try:
  61. response = requests.post(url, json=payload, headers=headers, timeout=30)
  62. if response.status_code == 200:
  63. result = response.json()
  64. if result.get("success"):
  65. print("✅ 水质异常模拟测试成功")
  66. print(f"模拟编号: {result.get('simulation', {}).get('simulationNo')}")
  67. print(f"风险等级: {result.get('simulation', {}).get('riskLevel')}")
  68. print(f"备用水源: {result.get('simulation', {}).get('backupWaterSource')}")
  69. return True
  70. else:
  71. print(f"❌ 水质异常模拟测试失败: {result.get('message')}")
  72. return False
  73. else:
  74. print(f"❌ HTTP错误: {response.status_code}")
  75. return False
  76. except Exception as e:
  77. print(f"❌ 水质异常模拟测试异常: {e}")
  78. return False
  79. def test_emergency_plan_list():
  80. """测试应急预案列表"""
  81. print("\n=== 测试应急预案列表 ===")
  82. url = f"{BASE_URL}/api/emergency/plan/list"
  83. headers = {
  84. "Authorization": f"Bearer {API_TOKEN}"
  85. }
  86. try:
  87. response = requests.get(url, headers=headers, timeout=30)
  88. if response.status_code == 200:
  89. result = response.json()
  90. if result.get("success"):
  91. plans = result.get("data", [])
  92. print(f"✅ 应急预案列表查询成功,共 {len(plans)} 个预案")
  93. for plan in plans[:3]: # 只显示前3个
  94. print(f" - {plan.get('planName')} ({plan.get('planNo')})")
  95. return True
  96. else:
  97. print(f"❌ 应急预案列表查询失败: {result.get('message')}")
  98. return False
  99. else:
  100. print(f"❌ HTTP错误: {response.status_code}")
  101. return False
  102. except Exception as e:
  103. print(f"❌ 应急预案列表查询异常: {e}")
  104. return False
  105. def test_emergency_status():
  106. """测试应急状态查询"""
  107. print("\n=== 测试应急状态查询 ===")
  108. url = f"{BASE_URL}/api/emergency/dispatch/status"
  109. headers = {
  110. "Authorization": f"Bearer {API_TOKEN}"
  111. }
  112. try:
  113. response = requests.get(url, headers=headers, timeout=30)
  114. if response.status_code == 200:
  115. result = response.json()
  116. if result.get("success"):
  117. status = result.get("status", {})
  118. print("✅ 应急状态查询成功")
  119. print(f"警报级别: {status.get('alertLevel')}")
  120. print(f"准备度评分: {status.get('preparednessScore')}")
  121. print(f"活跃预案数: {len(status.get('activePlans', []))}")
  122. return True
  123. else:
  124. print(f"❌ 应急状态查询失败: {result.get('message')}")
  125. return False
  126. else:
  127. print(f"❌ HTTP错误: {response.status_code}")
  128. return False
  129. except Exception as e:
  130. print(f"❌ 应急状态查询异常: {e}")
  131. return False
  132. def main():
  133. """主测试函数"""
  134. print(f"开始测试应急推演功能 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
  135. print("=" * 60)
  136. # 执行测试
  137. tests = [
  138. test_pipe_burst_simulation,
  139. test_water_quality_simulation,
  140. test_emergency_plan_list,
  141. test_emergency_status
  142. ]
  143. passed = 0
  144. total = len(tests)
  145. for test in tests:
  146. if test():
  147. passed += 1
  148. # 输出测试结果
  149. print("\n" + "=" * 60)
  150. print(f"测试完成: {passed}/{total} 通过")
  151. if passed == total:
  152. print("🎉 所有测试通过!应急推演功能正常工作。")
  153. sys.exit(0)
  154. else:
  155. print("⚠️ 部分测试失败,请检查功能实现。")
  156. sys.exit(1)
  157. if __name__ == "__main__":
  158. main()