Quellcode durchsuchen

实现数据接入层功能(REST API + WebSocket + 批量导入)

- 实现REST API服务器,支持IoT数据、手动录入和批量导入接口
- 实现WebSocket服务器,支持实时数据推送和连接管理
- 实现批量导入模块,支持CSV、Excel、JSON多种格式
- 实现数据处理工具,包含字段映射和单位转换功能
- 实现数据模型定义和数据验证机制
- 创建主程序入口和配置文件
- 添加详细的使用文档和API说明
bot_dev1 vor 4 Tagen
Commit
5eae031679
8 geänderte Dateien mit 2071 neuen und 0 gelöschten Zeilen
  1. 299
    0
      README.md
  2. 305
    0
      main.py
  3. 10
    0
      requirements.txt
  4. 181
    0
      src/api/rest_api.py
  5. 336
    0
      src/batch/batch_import.py
  6. 317
    0
      src/models/models.py
  7. 409
    0
      src/utils/data_utils.py
  8. 214
    0
      src/websocket/websocket_server.py

+ 299
- 0
README.md Datei anzeigen

@@ -0,0 +1,299 @@
1
+# 水务管理系统 - 数据接入层
2
+
3
+## 项目概述
4
+
5
+本项目是水务管理系统中的数据接入层,实现了多源数据接入、实时WebSocket推送和批量数据导入功能。
6
+
7
+## 功能特性
8
+
9
+### 1. REST API 数据接入
10
+- **IoT设备数据接入**:支持实时接收传感器数据
11
+- **手动数据录入**:支持人工录入数据
12
+- **批量API导入**:支持通过API批量导入数据
13
+- **数据查询接口**:提供按数据类型、时间范围等条件的数据查询
14
+- **统计分析接口**:提供数据统计和分析功能
15
+
16
+### 2. WebSocket 实时推送
17
+- **实时数据推送**:传感器数据实时推送到客户端
18
+- **连接管理**:支持多客户端连接和订阅管理
19
+- **数据历史**:新连接客户端可以获取历史数据
20
+- **警报推送**:支持实时警报推送
21
+- **心跳检测**:支持连接状态监控
22
+
23
+### 3. 批量数据导入
24
+- **多格式支持**:支持CSV、Excel、JSON格式导入
25
+- **数据验证**:内置数据验证和错误处理
26
+- **字段映射**:支持水利行业标准字段映射
27
+- **单位转换**:自动进行单位转换和标准化
28
+- **批量处理**:支持大批量数据处理
29
+
30
+## 技术栈
31
+
32
+- **后端框架**:FastAPI
33
+- **WebSocket**:websockets
34
+- **数据处理**:pandas, numpy
35
+- **异步处理**:asyncio
36
+- **数据验证**:pydantic
37
+- **文件处理**:aiofiles
38
+
39
+## 项目结构
40
+
41
+```
42
+water-management-system/
43
+├── src/
44
+│   ├── api/                 # REST API模块
45
+│   │   └── rest_api.py      # 主API服务器
46
+│   ├── websocket/           # WebSocket模块
47
+│   │   └── websocket_server.py  # WebSocket服务器
48
+│   ├── batch/               # 批量导入模块
49
+│   │   └── batch_import.py  # 批量导入功能
50
+│   ├── utils/               # 工具模块
51
+│   │   └── data_utils.py    # 数据处理工具
52
+│   └── models/              # 数据模型
53
+│       └── models.py        # 数据模型定义
54
+├── main.py                  # 主程序入口
55
+├── requirements.txt        # 依赖文件
56
+├── config.json             # 配置文件
57
+└── README.md               # 项目说明
58
+```
59
+
60
+## API 接口文档
61
+
62
+### REST API 端点
63
+
64
+#### 1. IoT 数据接收
65
+```
66
+POST /api/iot/data
67
+Content-Type: application/json
68
+
69
+{
70
+    "device_id": "device_001",
71
+    "data_type": "LL",
72
+    "value": 25.5,
73
+    "timestamp": "2024-01-01T12:00:00",
74
+    "location": "A区"
75
+}
76
+```
77
+
78
+#### 2. 手动数据录入
79
+```
80
+POST /api/manual/data
81
+Content-Type: application/json
82
+
83
+{
84
+    "source": "manual",
85
+    "data_type": "YL",
86
+    "value": 0.8,
87
+    "timestamp": "2024-01-01T12:00:00",
88
+    "operator": "张三",
89
+    "notes": "定期数据录入"
90
+}
91
+```
92
+
93
+#### 3. 批量导入
94
+```
95
+POST /api/batch/import
96
+Content-Type: application/json
97
+
98
+{
99
+    "batch_id": "batch_001",
100
+    "data_source": "system_import",
101
+    "records": [
102
+        {
103
+            "device_id": "device_002",
104
+            "data_type": "SW",
105
+            "value": 5.2,
106
+            "timestamp": "2024-01-01T12:00:00",
107
+            "location": "B区"
108
+        }
109
+    ]
110
+}
111
+```
112
+
113
+#### 4. 数据查询
114
+```
115
+GET /api/data/{data_type}?limit=100&offset=0
116
+GET /api/data/recent?hours=24&limit=100
117
+GET /api/stats
118
+```
119
+
120
+### WebSocket 连接
121
+
122
+#### 连接端点
123
+```
124
+ws://localhost:8765
125
+```
126
+
127
+#### 消息格式
128
+
129
+**发送消息**:
130
+```json
131
+{
132
+    "type": "subscribe",
133
+    "subscription": "LL"  // 订阅特定类型数据,"all"订阅所有
134
+}
135
+```
136
+
137
+**接收消息**:
138
+```json
139
+{
140
+    "type": "sensor_data",
141
+    "data_type": "LL",
142
+    "device_id": "device_001",
143
+    "value": 25.5,
144
+    "location": "A区",
145
+    "timestamp": "2024-01-01T12:00:00Z"
146
+}
147
+```
148
+
149
+## 安装和运行
150
+
151
+### 1. 安装依赖
152
+```bash
153
+pip install -r requirements.txt
154
+```
155
+
156
+### 2. 启动系统
157
+```bash
158
+# 普通模式
159
+python main.py
160
+
161
+# 演示模式(自动生成数据)
162
+python main.py --demo
163
+
164
+# 指定配置文件
165
+python main.py --config custom_config.json
166
+```
167
+
168
+### 3. 初始化项目
169
+```bash
170
+python main.py --init
171
+```
172
+
173
+## 配置文件
174
+
175
+创建 `config.json` 文件:
176
+
177
+```json
178
+{
179
+    "api": {
180
+        "host": "0.0.0.0",
181
+        "port": 8000
182
+    },
183
+    "websocket": {
184
+        "host": "0.0.0.0",
185
+        "port": 8765
186
+    },
187
+    "batch": {
188
+        "max_file_size_mb": 100,
189
+        "supported_formats": ["csv", "excel", "json"]
190
+    },
191
+    "demo_mode": true,
192
+    "logging": {
193
+        "level": "INFO",
194
+        "file": "water_management.log"
195
+    }
196
+}
197
+```
198
+
199
+## 数据类型说明
200
+
201
+支持的水利行业标准数据类型:
202
+
203
+| 数据类型 | 描述 | 单位 |
204
+|---------|------|------|
205
+| LL | 流量 | m³/h |
206
+| YL | 压力 | MPa |
207
+| SW | 水位 | m |
208
+| ZD | 浊度 | NTU |
209
+| PH | pH值 | - |
210
+| WD | 温度 | °C |
211
+| DD | 电导率 | μS/cm |
212
+| YD | 硬度 | mg/L |
213
+
214
+## 开发规范
215
+
216
+### 代码结构
217
+- 遵循模块化设计,功能解耦
218
+- 使用异步编程提高性能
219
+- 统一的错误处理机制
220
+- 完整的日志记录
221
+
222
+### 数据处理
223
+- 数据验证和清洗
224
+- 标准化字段映射
225
+- 单位自动转换
226
+- 质量评分机制
227
+
228
+### 安全考虑
229
+- 输入数据验证
230
+- 文件大小限制
231
+- 连接状态监控
232
+- 错误信息脱敏
233
+
234
+## 测试和验证
235
+
236
+### 数据验证
237
+```python
238
+from src.utils.data_utils import data_converter
239
+
240
+# 验证传感器数据
241
+validation_result = data_converter.validate_sensor_data({
242
+    "device_id": "device_001",
243
+    "data_type": "LL",
244
+    "value": 25.5,
245
+    "location": "A区"
246
+})
247
+
248
+print(validation_result)
249
+```
250
+
251
+### 批量导入测试
252
+```python
253
+import asyncio
254
+from src.batch.batch_import import batch_manager
255
+
256
+async def test_import():
257
+    result = await batch_manager.import_file(
258
+        file_path="data.csv",
259
+        batch_id="test_batch",
260
+        data_source="test"
261
+    )
262
+    print(result)
263
+
264
+asyncio.run(test_import())
265
+```
266
+
267
+## 部署建议
268
+
269
+### 1. 生产环境配置
270
+- 使用反向代理(Nginx)
271
+- 配置SSL证书
272
+- 设置防火墙规则
273
+- 监控系统资源使用
274
+
275
+### 2. 性能优化
276
+- 数据库连接池
277
+- 缓存机制
278
+- 异步处理优化
279
+- 连接数限制
280
+
281
+### 3. 监控和日志
282
+- 应用性能监控
283
+- 错误日志收集
284
+- 性能指标统计
285
+- 告警机制
286
+
287
+## 许可证
288
+
289
+本项目遵循 MIT 许可证。
290
+
291
+## 贡献指南
292
+
293
+欢迎提交 Issue 和 Pull Request来贡献代码。
294
+
295
+## 联系方式
296
+
297
+如有问题,请通过以下方式联系:
298
+- 邮箱:bot_dev1@xayunmei.com
299
+- 项目地址:http://git.xayunmei.com/bot_ym/water-management-system

+ 305
- 0
main.py Datei anzeigen

@@ -0,0 +1,305 @@
1
+"""
2
+水务管理系统主程序
3
+集成REST API、WebSocket和批量导入功能
4
+"""
5
+import asyncio
6
+import logging
7
+import signal
8
+import sys
9
+from pathlib import Path
10
+import argparse
11
+from datetime import datetime
12
+
13
+# 导入各个模块
14
+from src.api.rest_api import app as rest_api_app
15
+from src.websocket.websocket_server import websocket_server
16
+from src.batch.batch_import import batch_manager
17
+from src.utils.data_utils import data_converter, data_formatter, quality_checker
18
+from src.models.models import validator
19
+
20
+# 配置日志
21
+logging.basicConfig(
22
+    level=logging.INFO,
23
+    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
24
+    handlers=[
25
+        logging.FileHandler('water_management.log'),
26
+        logging.StreamHandler(sys.stdout)
27
+    ]
28
+)
29
+logger = logging.getLogger(__name__)
30
+
31
+class WaterManagementSystem:
32
+    """水务管理系统主类"""
33
+    
34
+    def __init__(self, config_file: str = "config.json"):
35
+        self.config_file = config_file
36
+        self.running = False
37
+        self.tasks = []
38
+        self.config = self.load_config()
39
+        
40
+    def load_config(self) -> dict:
41
+        """加载配置文件"""
42
+        config_path = Path(self.config_file)
43
+        if config_path.exists():
44
+            import json
45
+            with open(config_path, 'r', encoding='utf-8') as f:
46
+                return json.load(f)
47
+        else:
48
+            # 默认配置
49
+            return {
50
+                "api": {
51
+                    "host": "0.0.0.0",
52
+                    "port": 8000
53
+                },
54
+                "websocket": {
55
+                    "host": "0.0.0.0",
56
+                    "port": 8765
57
+                },
58
+                "batch": {
59
+                    "max_file_size_mb": 100,
60
+                    "supported_formats": ["csv", "excel", "json"]
61
+                },
62
+                "logging": {
63
+                    "level": "INFO",
64
+                    "file": "water_management.log"
65
+                }
66
+            }
67
+    
68
+    async def start_api_server(self):
69
+        """启动API服务器"""
70
+        import uvicorn
71
+        logger.info("启动REST API服务器...")
72
+        
73
+        # 在新的事件循环中运行uvicorn
74
+        api_config = uvicorn.Config(
75
+            app=rest_api_app,
76
+            host=self.config["api"]["host"],
77
+            port=self.config["api"]["port"],
78
+            log_level="info"
79
+        )
80
+        api_server = uvicorn.Server(api_config)
81
+        
82
+        # 在后台任务中运行
83
+        await api_server.serve()
84
+    
85
+    async def start_websocket_server(self):
86
+        """启动WebSocket服务器"""
87
+        logger.info("启动WebSocket服务器...")
88
+        
89
+        # 启动WebSocket服务器
90
+        server = await websocket_server.start_server()
91
+        
92
+        # 添加服务器关闭处理
93
+        def cleanup():
94
+            logger.info("关闭WebSocket服务器...")
95
+            server.close()
96
+            asyncio.create_task(server.wait_closed())
97
+        
98
+        return server, cleanup
99
+    
100
+    async def start_data_generator(self):
101
+        """启动数据生成器(用于演示)"""
102
+        logger.info("启动数据生成器...")
103
+        
104
+        while self.running:
105
+            # 生成模拟数据
106
+            import random
107
+            sensor_types = ["LL", "YL", "SW", "ZD"]
108
+            sensor_type = random.choice(sensor_types)
109
+            
110
+            # 根据传感器类型生成合理的数值范围
111
+            if sensor_type == "LL":  # 流量
112
+                value = random.uniform(10, 100)
113
+            elif sensor_type == "YL":  # 压力
114
+                value = random.uniform(0.1, 1.0)
115
+            elif sensor_type == "SW":  # 水位
116
+                value = random.uniform(0, 10)
117
+            else:  # ZD 浊度
118
+                value = random.uniform(0, 50)
119
+            
120
+            sensor_data = {
121
+                "data_type": sensor_type,
122
+                "device_id": f"device_{random.randint(1, 10)}",
123
+                "value": round(value, 2),
124
+                "location": random.choice(["A区", "B区", "C区", "D区"])
125
+            }
126
+            
127
+            # 通过WebSocket发送数据
128
+            await websocket_server.send_sensor_data(sensor_data)
129
+            
130
+            # 等待5秒
131
+            await asyncio.sleep(5)
132
+    
133
+    async def handle_batch_import(self, file_path: str, batch_id: str, data_source: str):
134
+        """处理批量导入请求"""
135
+        try:
136
+            logger.info(f"开始批量导入: {file_path}")
137
+            
138
+            # 验证文件
139
+            validation_result = await batch_manager.validate_file(file_path)
140
+            if not validation_result["valid"]:
141
+                raise Exception(f"文件验证失败: {validation_result['error']}")
142
+            
143
+            # 导入文件
144
+            result = await batch_manager.import_file(
145
+                file_path=file_path,
146
+                batch_id=batch_id,
147
+                data_source=data_source,
148
+                file_type="auto"
149
+            )
150
+            
151
+            logger.info(f"批量导入完成: {result}")
152
+            return result
153
+            
154
+        except Exception as e:
155
+            logger.error(f"批量导入失败: {str(e)}")
156
+            raise
157
+    
158
+    async def start_system(self):
159
+        """启动系统"""
160
+        logger.info("启动水务管理系统...")
161
+        
162
+        # 标记系统为运行状态
163
+        self.running = True
164
+        
165
+        try:
166
+            # 启动WebSocket服务器
167
+            ws_server, ws_cleanup = await self.start_websocket_server()
168
+            self.tasks.append(ws_server)
169
+            
170
+            # 启动数据生成器(如果启用)
171
+            if self.config.get("demo_mode", False):
172
+                generator_task = asyncio.create_task(self.start_data_generator())
173
+                self.tasks.append(generator_task)
174
+            
175
+            # 启动API服务器
176
+            await self.start_api_server()
177
+            
178
+        except Exception as e:
179
+            logger.error(f"系统启动失败: {str(e)}")
180
+            await self.stop_system()
181
+            raise
182
+    
183
+    async def stop_system(self):
184
+        """停止系统"""
185
+        logger.info("停止水务管理系统...")
186
+        
187
+        self.running = False
188
+        
189
+        # 取消所有任务
190
+        for task in self.tasks:
191
+            if not task.done():
192
+                task.cancel()
193
+                try:
194
+                    await task
195
+                except asyncio.CancelledError:
196
+                    pass
197
+        
198
+        # 清理WebSocket服务器
199
+        if hasattr(websocket_server, 'server') and websocket_server.server:
200
+            websocket_server.server.close()
201
+            await websocket_server.server.wait_closed()
202
+        
203
+        logger.info("水务管理系统已停止")
204
+    
205
+    async def run(self):
206
+        """运行系统"""
207
+        # 设置信号处理
208
+        def signal_handler():
209
+            logger.info("收到停止信号...")
210
+            asyncio.create_task(self.stop_system())
211
+        
212
+        for sig in [signal.SIGINT, signal.SIGTERM]:
213
+            signal.signal(sig, signal_handler)
214
+        
215
+        try:
216
+            await self.start_system()
217
+            
218
+            # 保持运行直到收到停止信号
219
+            while self.running:
220
+                await asyncio.sleep(1)
221
+                
222
+        except KeyboardInterrupt:
223
+            logger.info("收到键盘中断信号")
224
+        except Exception as e:
225
+            logger.error(f"系统运行时出错: {str(e)}")
226
+        finally:
227
+            await self.stop_system()
228
+
229
+def create_sample_config():
230
+    """创建示例配置文件"""
231
+    sample_config = {
232
+        "api": {
233
+            "host": "0.0.0.0",
234
+            "port": 8000
235
+        },
236
+        "websocket": {
237
+            "host": "0.0.0.0",
238
+            "port": 8765
239
+        },
240
+        "batch": {
241
+            "max_file_size_mb": 100,
242
+            "supported_formats": ["csv", "excel", "json"]
243
+        },
244
+        "demo_mode": True,
245
+        "logging": {
246
+            "level": "INFO",
247
+            "file": "water_management.log"
248
+        }
249
+    }
250
+    
251
+    import json
252
+    with open("config.json", 'w', encoding='utf-8') as f:
253
+        json.dump(sample_config, f, indent=2, ensure_ascii=False)
254
+    
255
+    logger.info("示例配置文件已创建: config.json")
256
+
257
+def create_requirements():
258
+    """创建requirements.txt文件"""
259
+    requirements = [
260
+        "fastapi==0.104.1",
261
+        "uvicorn[standard]==0.24.0",
262
+        "websockets==12.0",
263
+        "pandas==2.1.3",
264
+        "openpyxl==3.1.2",
265
+        "aiofiles==23.2.1",
266
+        "python-multipart==0.0.6",
267
+        "jinja2==3.1.2"
268
+    ]
269
+    
270
+    with open("requirements.txt", 'w') as f:
271
+        f.write('\n'.join(requirements))
272
+    
273
+    logger.info("依赖文件已创建: requirements.txt")
274
+
275
+def main():
276
+    """主函数"""
277
+    parser = argparse.ArgumentParser(description="水务管理系统")
278
+    parser.add_argument("--config", "-c", default="config.json", help="配置文件路径")
279
+    parser.add_argument("--init", action="store_true", help="初始化项目(创建配置文件和依赖)")
280
+    parser.add_argument("--demo", action="store_true", help="启动演示模式")
281
+    
282
+    args = parser.parse_args()
283
+    
284
+    if args.init:
285
+        create_sample_config()
286
+        create_requirements()
287
+        logger.info("项目初始化完成")
288
+        return
289
+    
290
+    # 创建系统实例
291
+    system = WaterManagementSystem(args.config)
292
+    
293
+    # 如果启用演示模式
294
+    if args.demo:
295
+        system.config["demo_mode"] = True
296
+        logger.info("启用演示模式")
297
+    
298
+    # 运行系统
299
+    try:
300
+        asyncio.run(system.run())
301
+    except KeyboardInterrupt:
302
+        logger.info("程序已退出")
303
+
304
+if __name__ == "__main__":
305
+    main()

+ 10
- 0
requirements.txt Datei anzeigen

@@ -0,0 +1,10 @@
1
+fastapi==0.104.1
2
+uvicorn[standard]==0.24.0
3
+websockets==12.0
4
+pandas==2.1.3
5
+openpyxl==3.1.2
6
+aiofiles==23.2.1
7
+python-multipart==0.0.6
8
+jinja2==3.1.2
9
+requests==2.31.0
10
+python-dateutil==2.8.2

+ 181
- 0
src/api/rest_api.py Datei anzeigen

@@ -0,0 +1,181 @@
1
+"""
2
+REST API 数据接入模块
3
+支持 IoT 设备数据、手动录入和 API 批量导入
4
+"""
5
+from fastapi import FastAPI, HTTPException, Depends
6
+from fastapi.middleware.cors import CORSMiddleware
7
+from pydantic import BaseModel
8
+from typing import List, Optional, Dict, Any
9
+import uvicorn
10
+import asyncio
11
+import json
12
+from datetime import datetime
13
+
14
+# 创建FastAPI应用
15
+app = FastAPI(title="Water Management System Data API", version="1.0.0")
16
+
17
+# CORS配置
18
+app.add_middleware(
19
+    CORSMiddleware,
20
+    allow_origins=["*"],
21
+    allow_credentials=True,
22
+    allow_methods=["*"],
23
+    allow_headers=["*"],
24
+)
25
+
26
+# 数据模型
27
+class IoTData(BaseModel):
28
+    device_id: str
29
+    data_type: str  # "LL", "YL", "SW", "ZD" 等
30
+    value: float
31
+    timestamp: datetime
32
+    location: str
33
+
34
+class ManualInputData(BaseModel):
35
+    source: str
36
+    data_type: str
37
+    value: float
38
+    timestamp: datetime
39
+    operator: str
40
+    notes: Optional[str] = None
41
+
42
+class BatchImportRequest(BaseModel):
43
+    batch_id: str
44
+    data_source: str
45
+    records: List[Dict[str, Any]]
46
+
47
+# 数据存储(示例,实际应该用数据库)
48
+data_store = []
49
+
50
+@app.get("/")
51
+async def root():
52
+    """API根路径"""
53
+    return {"message": "Water Management System API", "version": "1.0.0"}
54
+
55
+@app.post("/api/iot/data")
56
+async def receive_iot_data(data: IoTData):
57
+    """接收IoT设备数据"""
58
+    try:
59
+        data_dict = data.dict()
60
+        data_store.append({
61
+            **data_dict,
62
+            "id": len(data_store) + 1,
63
+            "type": "iot"
64
+        })
65
+        return {"status": "success", "id": len(data_store), "message": "IoT data received"}
66
+    except Exception as e:
67
+        raise HTTPException(status_code=400, detail=str(e))
68
+
69
+@app.post("/api/manual/data")
70
+async def receive_manual_data(data: ManualInputData):
71
+    """接收手动录入数据"""
72
+    try:
73
+        data_dict = data.dict()
74
+        data_store.append({
75
+            **data_dict,
76
+            "id": len(data_store) + 1,
77
+            "type": "manual"
78
+        })
79
+        return {"status": "success", "id": len(data_store), "message": "Manual data received"}
80
+    except Exception as e:
81
+        raise HTTPException(status_code=400, detail=str(e))
82
+
83
+@app.post("/api/batch/import")
84
+async def batch_import(request: BatchImportRequest):
85
+    """批量导入数据"""
86
+    try:
87
+        imported_count = 0
88
+        failed_count = 0
89
+        
90
+        for record in request.records:
91
+            # 验证记录
92
+            if not all(k in record for k in ['device_id', 'data_type', 'value']):
93
+                failed_count += 1
94
+                continue
95
+                
96
+            # 创建数据对象
97
+            data_record = {
98
+                "device_id": record['device_id'],
99
+                "data_type": record['data_type'],
100
+                "value": float(record['value']),
101
+                "timestamp": record.get('timestamp', datetime.now()),
102
+                "location": record.get('location', 'unknown'),
103
+                "batch_id": request.batch_id,
104
+                "source": request.data_source,
105
+                "id": len(data_store) + 1,
106
+                "type": "batch"
107
+            }
108
+            
109
+            data_store.append(data_record)
110
+            imported_count += 1
111
+            
112
+        return {
113
+            "status": "success",
114
+            "imported_count": imported_count,
115
+            "failed_count": failed_count,
116
+            "message": f"Batch import completed: {imported_count} records imported, {failed_count} failed"
117
+        }
118
+    except Exception as e:
119
+        raise HTTPException(status_code=400, detail=str(e))
120
+
121
+@app.get("/api/data/{data_type}")
122
+async def get_data_by_type(data_type: str, limit: int = 100, offset: int = 0):
123
+    """根据数据类型获取数据"""
124
+    filtered_data = [
125
+        item for item in data_store 
126
+        if item.get('data_type') == data_type
127
+    ]
128
+    
129
+    return {
130
+        "data": filtered_data[offset:offset+limit],
131
+        "total": len(filtered_data),
132
+        "limit": limit,
133
+        "offset": offset
134
+    }
135
+
136
+@app.get("/api/data/recent")
137
+async def get_recent_data(hours: int = 24, limit: int = 100):
138
+    """获取最近的数据"""
139
+    from datetime import timedelta
140
+    cutoff_time = datetime.now() - timedelta(hours=hours)
141
+    
142
+    recent_data = [
143
+        item for item in data_store 
144
+        if item.get('timestamp', datetime.now()) > cutoff_time
145
+    ]
146
+    
147
+    return {
148
+        "data": recent_data[-limit:],
149
+        "total": len(recent_data),
150
+        "hours": hours,
151
+        "limit": limit
152
+    }
153
+
154
+@app.get("/api/stats")
155
+async def get_statistics():
156
+    """获取数据统计信息"""
157
+    stats = {
158
+        "total_records": len(data_store),
159
+        "by_type": {},
160
+        "by_device": {},
161
+        "by_hour": {}
162
+    }
163
+    
164
+    for item in data_store:
165
+        data_type = item.get('data_type', 'unknown')
166
+        device_id = item.get('device_id', 'unknown')
167
+        hour = item.get('timestamp', datetime.now()).strftime('%Y-%m-%d %H:00:00')
168
+        
169
+        stats['by_type'][data_type] = stats['by_type'].get(data_type, 0) + 1
170
+        stats['by_device'][device_id] = stats['by_device'].get(device_id, 0) + 1
171
+        stats['by_hour'][hour] = stats['by_hour'].get(hour, 0) + 1
172
+    
173
+    return stats
174
+
175
+@app.get("/health")
176
+async def health_check():
177
+    """健康检查"""
178
+    return {"status": "healthy", "timestamp": datetime.now()}
179
+
180
+if __name__ == "__main__":
181
+    uvicorn.run(app, host="0.0.0.0", port=8000)

+ 336
- 0
src/batch/batch_import.py Datei anzeigen

@@ -0,0 +1,336 @@
1
+"""
2
+批量数据导入模块
3
+支持CSV、Excel、JSON等多种格式的批量数据导入
4
+"""
5
+import pandas as pd
6
+import json
7
+import csv
8
+import asyncio
9
+import aiofiles
10
+from typing import List, Dict, Any, Optional, Union
11
+from pathlib import Path
12
+from datetime import datetime
13
+import logging
14
+from concurrent.futures import ThreadPoolExecutor
15
+
16
+# 配置日志
17
+logging.basicConfig(level=logging.INFO)
18
+logger = logging.getLogger(__name__)
19
+
20
+class BatchImportError(Exception):
21
+    """批量导入异常"""
22
+    pass
23
+
24
+class DataValidator:
25
+    """数据验证器"""
26
+    
27
+    # 水利行业标准字段映射
28
+    STANDARD_FIELDS = {
29
+        "LL": "流量",
30
+        "YL": "压力", 
31
+        "SW": "水位",
32
+        "ZD": "浊度",
33
+        "PH": "pH值",
34
+        "WD": "温度",
35
+        "DD": "电导率",
36
+        "YD": "硬度"
37
+    }
38
+    
39
+    # 单位映射
40
+    UNIT_MAP = {
41
+        "LL": "m³/h",
42
+        "YL": "MPa", 
43
+        "SW": "m",
44
+        "ZD": "NTU",
45
+        "PH": "",
46
+        "WD": "°C",
47
+        "DD": "μS/cm",
48
+        "YD": "mg/L"
49
+    }
50
+    
51
+    @classmethod
52
+    def validate_data_type(cls, data_type: str) -> bool:
53
+        """验证数据类型是否有效"""
54
+        return data_type in cls.STANDARD_FIELDS
55
+    
56
+    @classmethod
57
+    def get_field_description(cls, data_type: str) -> str:
58
+        """获取字段描述"""
59
+        return cls.STANDARD_FIELDS.get(data_type, "未知类型")
60
+    
61
+    @classmethod
62
+    def get_unit(cls, data_type: str) -> str:
63
+        """获取单位"""
64
+        return cls.UNIT_MAP.get(data_type, "")
65
+    
66
+    @classmethod
67
+    def validate_record(cls, record: Dict[str, Any]) -> Dict[str, Any]:
68
+        """验证单条记录"""
69
+        errors = []
70
+        validated_record = {}
71
+        
72
+        # 必需字段检查
73
+        required_fields = ["device_id", "data_type", "value"]
74
+        for field in required_fields:
75
+            if field not in record:
76
+                errors.append(f"缺少必需字段: {field}")
77
+            else:
78
+                validated_record[field] = record[field]
79
+        
80
+        # 数据类型验证
81
+        if "data_type" in validated_record:
82
+            if not cls.validate_data_type(validated_record["data_type"]):
83
+                errors.append(f"无效的数据类型: {validated_record['data_type']}")
84
+        
85
+        # 数值验证
86
+        if "value" in validated_record:
87
+            try:
88
+                validated_record["value"] = float(validated_record["value"])
89
+            except (ValueError, TypeError):
90
+                errors.append(f"无效的数值: {validated_record['value']}")
91
+        
92
+        # 时间戳处理
93
+        if "timestamp" in record:
94
+            try:
95
+                if isinstance(record["timestamp"], str):
96
+                    validated_record["timestamp"] = datetime.fromisoformat(record["timestamp"])
97
+                else:
98
+                    validated_record["timestamp"] = record["timestamp"]
99
+            except (ValueError, TypeError):
100
+                # 如果时间戳无效,使用当前时间
101
+                validated_record["timestamp"] = datetime.now()
102
+        else:
103
+            validated_record["timestamp"] = datetime.now()
104
+        
105
+        # 地点字段处理
106
+        validated_record["location"] = record.get("location", "未知")
107
+        
108
+        return {
109
+            "validated": len(errors) == 0,
110
+            "record": validated_record,
111
+            "errors": errors
112
+        }
113
+
114
+class BatchImporter:
115
+    """批量导入器"""
116
+    
117
+    def __init__(self):
118
+        self.validator = DataValidator()
119
+    
120
+    async def import_csv(self, file_path: str, batch_id: str, data_source: str) -> Dict[str, Any]:
121
+        """导入CSV文件"""
122
+        try:
123
+            # 使用线程池执行文件读取
124
+            loop = asyncio.get_event_loop()
125
+            with ThreadPoolExecutor() as executor:
126
+                df = await loop.run_in_executor(
127
+                    executor, 
128
+                    lambda: pd.read_csv(file_path, encoding='utf-8')
129
+                )
130
+            
131
+            return await self._process_dataframe(df, batch_id, data_source)
132
+            
133
+        except Exception as e:
134
+            raise BatchImportError(f"CSV文件导入失败: {str(e)}")
135
+    
136
+    async def import_excel(self, file_path: str, batch_id: str, data_source: str, sheet_name: str = 0) -> Dict[str, Any]:
137
+        """导入Excel文件"""
138
+        try:
139
+            # 使用线程池执行文件读取
140
+            loop = asyncio.get_event_loop()
141
+            with ThreadPoolExecutor() as executor:
142
+                df = await loop.run_in_executor(
143
+                    executor,
144
+                    lambda: pd.read_excel(file_path, sheet_name=sheet_name)
145
+                )
146
+            
147
+            return await self._process_dataframe(df, batch_id, data_source)
148
+            
149
+        except Exception as e:
150
+            raise BatchImportError(f"Excel文件导入失败: {str(e)}")
151
+    
152
+    async def import_json(self, file_path: str, batch_id: str, data_source: str) -> Dict[str, Any]:
153
+        """导入JSON文件"""
154
+        try:
155
+            async with aiofiles.open(file_path, 'r', encoding='utf-8') as f:
156
+                content = await f.read()
157
+            
158
+            data = json.loads(content)
159
+            
160
+            # 处理不同的JSON格式
161
+            if isinstance(data, list):
162
+                return await self._process_records(data, batch_id, data_source)
163
+            elif isinstance(data, dict):
164
+                if "records" in data:
165
+                    return await self._process_records(data["records"], batch_id, data_source)
166
+                else:
167
+                    return await self._process_records([data], batch_id, data_source)
168
+            else:
169
+                raise BatchImportError("不支持的JSON格式")
170
+                
171
+        except Exception as e:
172
+            raise BatchImportError(f"JSON文件导入失败: {str(e)}")
173
+    
174
+    async def _process_dataframe(self, df: pd.DataFrame, batch_id: str, data_source: str) -> Dict[str, Any]:
175
+        """处理DataFrame数据"""
176
+        # 转换为字典列表
177
+        records = df.to_dict('records')
178
+        return await self._process_records(records, batch_id, data_source)
179
+    
180
+    async def _process_records(self, records: List[Dict[str, Any]], batch_id: str, data_source: str) -> Dict[str, Any]:
181
+        """处理记录列表"""
182
+        import_count = 0
183
+        error_count = 0
184
+        errors = []
185
+        imported_records = []
186
+        
187
+        for i, record in enumerate(records):
188
+            validation_result = self.validator.validate_record(record)
189
+            
190
+            if validation_result["validated"]:
191
+                # 添加批次信息
192
+                import_record = {
193
+                    **validation_result["record"],
194
+                    "batch_id": batch_id,
195
+                    "data_source": data_source,
196
+                    "import_time": datetime.now(),
197
+                    "type": "batch"
198
+                }
199
+                
200
+                imported_records.append(import_record)
201
+                import_count += 1
202
+            else:
203
+                error_count += 1
204
+                error_msg = f"记录 {i+1}: {', '.join(validation_result['errors'])}"
205
+                errors.append(error_msg)
206
+                logger.warning(error_msg)
207
+        
208
+        # 保存导入的记录到文件(实际项目中应该保存到数据库)
209
+        await self._save_imported_records(imported_records)
210
+        
211
+        return {
212
+            "status": "completed" if error_count == 0 else "completed_with_errors",
213
+            "imported_count": import_count,
214
+            "error_count": error_count,
215
+            "total_count": len(records),
216
+            "success_rate": import_count / len(records) if records else 0,
217
+            "batch_id": batch_id,
218
+            "data_source": data_source,
219
+            "errors": errors[:10],  # 只返回前10个错误
220
+            "imported_records": imported_records[:5]  # 返回前5条记录作为示例
221
+        }
222
+    
223
+    async def _save_imported_records(self, records: List[Dict[str, Any]]):
224
+        """保存导入的记录"""
225
+        # 这里可以将记录保存到数据库或文件
226
+        # 为了示例,我们只保存到日志
227
+        for record in records:
228
+            logger.info(f"导入记录: {record}")
229
+    
230
+    async def get_import_summary(self, batch_id: str) -> Dict[str, Any]:
231
+        """获取导入摘要"""
232
+        # 这里应该从数据库查询批次信息
233
+        # 为了示例,返回一个空摘要
234
+        return {
235
+            "batch_id": batch_id,
236
+            "status": "not_found",
237
+            "message": "批次信息未找到(示例实现)"
238
+        }
239
+
240
+class BatchImportManager:
241
+    """批量导入管理器"""
242
+    
243
+    def __init__(self):
244
+        self.importer = BatchImporter()
245
+    
246
+    async def import_file(self, file_path: str, batch_id: str, data_source: str, 
247
+                        file_type: str = "auto", **kwargs) -> Dict[str, Any]:
248
+        """导入文件"""
249
+        file_path_obj = Path(file_path)
250
+        
251
+        if not file_path_obj.exists():
252
+            raise BatchImportError(f"文件不存在: {file_path}")
253
+        
254
+        # 自动检测文件类型
255
+        if file_type == "auto":
256
+            if file_path_obj.suffix.lower() == '.csv':
257
+                file_type = "csv"
258
+            elif file_path_obj.suffix.lower() in ['.xlsx', '.xls']:
259
+                file_type = "excel"
260
+            elif file_path_obj.suffix.lower() == '.json':
261
+                file_type = "json"
262
+            else:
263
+                raise BatchImportError(f"不支持的文件类型: {file_path_obj.suffix}")
264
+        
265
+        logger.info(f"开始导入{file_type}文件: {file_path}")
266
+        
267
+        if file_type == "csv":
268
+            result = await self.importer.import_csv(file_path, batch_id, data_source)
269
+        elif file_type == "excel":
270
+            sheet_name = kwargs.get("sheet_name", 0)
271
+            result = await self.importer.import_excel(file_path, batch_id, data_source, sheet_name)
272
+        elif file_type == "json":
273
+            result = await self.importer.import_json(file_path, batch_id, data_source)
274
+        else:
275
+            raise BatchImportError(f"不支持的文件类型: {file_type}")
276
+        
277
+        logger.info(f"文件导入完成: {result}")
278
+        return result
279
+    
280
+    async def validate_file(self, file_path: str) -> Dict[str, Any]:
281
+        """验证文件格式"""
282
+        file_path_obj = Path(file_path)
283
+        
284
+        if not file_path_obj.exists():
285
+            return {"valid": False, "error": "文件不存在"}
286
+        
287
+        file_size = file_path_obj.stat().st_size
288
+        if file_size > 100 * 1024 * 1024:  # 100MB限制
289
+            return {"valid": False, "error": "文件过大,最大支持100MB"}
290
+        
291
+        # 尝试读取文件前几行进行验证
292
+        try:
293
+            with open(file_path, 'r', encoding='utf-8') as f:
294
+                first_line = f.readline()
295
+                if not first_line:
296
+                    return {"valid": False, "error": "文件为空"}
297
+        except Exception as e:
298
+            return {"valid": False, "error": f"无法读取文件: {str(e)}"}
299
+        
300
+        return {"valid": True, "size": file_size, "format": file_path_obj.suffix.lower()}
301
+
302
+# 全局导入管理器实例
303
+batch_manager = BatchImportManager()
304
+
305
+# 示例用法
306
+async def example_usage():
307
+    """示例用法"""
308
+    import os
309
+    
310
+    # 创建示例CSV文件
311
+    sample_data = [
312
+        {"device_id": "device_001", "data_type": "LL", "value": 25.5, "location": "A区"},
313
+        {"device_id": "device_002", "data_type": "YL", "value": 0.8, "location": "B区"},
314
+        {"device_id": "device_003", "data_type": "SW", "value": 5.2, "location": "C区"}
315
+    ]
316
+    
317
+    sample_file = "/tmp/sample_data.csv"
318
+    with open(sample_file, 'w', newline='', encoding='utf-8') as f:
319
+        writer = csv.DictWriter(f, fieldnames=["device_id", "data_type", "value", "location"])
320
+        writer.writeheader()
321
+        writer.writerows(sample_data)
322
+    
323
+    # 导入文件
324
+    try:
325
+        result = await batch_manager.import_file(
326
+            file_path=sample_file,
327
+            batch_id="batch_" + datetime.now().strftime("%Y%m%d_%H%M%S"),
328
+            data_source="manual_test",
329
+            file_type="csv"
330
+        )
331
+        print("导入结果:", result)
332
+    except Exception as e:
333
+        print(f"导入失败: {str(e)}")
334
+
335
+if __name__ == "__main__":
336
+    asyncio.run(example_usage())

+ 317
- 0
src/models/models.py Datei anzeigen

@@ -0,0 +1,317 @@
1
+"""
2
+数据模型定义
3
+定义水务管理系统的各种数据结构
4
+"""
5
+from dataclasses import dataclass, field
6
+from typing import Dict, List, Optional, Any
7
+from datetime import datetime
8
+from enum import Enum
9
+
10
+class DataType(Enum):
11
+    """数据类型枚举"""
12
+    LL = "LL"  # 流量
13
+    YL = "YL"  # 压力
14
+    SW = "SW"  # 水位
15
+    ZD = "ZD"  # 浊度
16
+    PH = "PH"  # pH值
17
+    WD = "WD"  # 温度
18
+    DD = "DD"  # 电导率
19
+    YD = "YD"  # 硬度
20
+
21
+class AlertLevel(Enum):
22
+    """警报级别枚举"""
23
+    INFO = "info"
24
+    WARNING = "warning"
25
+    ERROR = "error"
26
+    CRITICAL = "critical"
27
+
28
+@dataclass
29
+class Device:
30
+    """设备模型"""
31
+    id: str
32
+    name: str
33
+    device_type: str
34
+    location: str
35
+    description: Optional[str] = None
36
+    install_date: Optional[datetime] = None
37
+    status: str = "active"  # active, inactive, maintenance
38
+    metadata: Dict[str, Any] = field(default_factory=dict)
39
+
40
+@dataclass
41
+class SensorData:
42
+    """传感器数据模型"""
43
+    id: str
44
+    device_id: str
45
+    data_type: DataType
46
+    value: float
47
+    unit: str
48
+    timestamp: datetime
49
+    location: str
50
+    quality_score: float = 1.0
51
+    metadata: Dict[str, Any] = field(default_factory=dict)
52
+    
53
+    def to_dict(self) -> Dict[str, Any]:
54
+        """转换为字典"""
55
+        return {
56
+            "id": self.id,
57
+            "device_id": self.device_id,
58
+            "data_type": self.data_type.value,
59
+            "value": self.value,
60
+            "unit": self.unit,
61
+            "timestamp": self.timestamp.isoformat(),
62
+            "location": self.location,
63
+            "quality_score": self.quality_score,
64
+            "metadata": self.metadata
65
+        }
66
+    
67
+    @classmethod
68
+    def from_dict(cls, data: Dict[str, Any]) -> 'SensorData':
69
+        """从字典创建对象"""
70
+        return cls(
71
+            id=data["id"],
72
+            device_id=data["device_id"],
73
+            data_type=DataType(data["data_type"]),
74
+            value=float(data["value"]),
75
+            unit=data.get("unit", ""),
76
+            timestamp=datetime.fromisoformat(data["timestamp"]),
77
+            location=data.get("location", ""),
78
+            quality_score=float(data.get("quality_score", 1.0)),
79
+            metadata=data.get("metadata", {})
80
+        )
81
+
82
+@dataclass
83
+class Alert:
84
+    """警报模型"""
85
+    id: str
86
+    device_id: str
87
+    alert_type: str
88
+    level: AlertLevel
89
+    message: str
90
+    timestamp: datetime
91
+    resolved: bool = False
92
+    resolved_by: Optional[str] = None
93
+    resolved_at: Optional[datetime] = None
94
+    metadata: Dict[str, Any] = field(default_factory=dict)
95
+    
96
+    def to_dict(self) -> Dict[str, Any]:
97
+        """转换为字典"""
98
+        return {
99
+            "id": self.id,
100
+            "device_id": self.device_id,
101
+            "alert_type": self.alert_type,
102
+            "level": self.level.value,
103
+            "message": self.message,
104
+            "timestamp": self.timestamp.isoformat(),
105
+            "resolved": self.resolved,
106
+            "resolved_by": self.resolved_by,
107
+            "resolved_at": self.resolved_at.isoformat() if self.resolved_at else None,
108
+            "metadata": self.metadata
109
+        }
110
+
111
+@dataclass
112
+class BatchImport:
113
+    """批量导入记录模型"""
114
+    id: str
115
+    batch_id: str
116
+    data_source: str
117
+    total_records: int
118
+    successful_records: int
119
+    failed_records: int
120
+    status: str  # pending, processing, completed, failed
121
+    file_name: Optional[str] = None
122
+    import_time: Optional[datetime] = None
123
+    completed_time: Optional[datetime] = None
124
+    error_messages: List[str] = field(default_factory=list)
125
+    metadata: Dict[str, Any] = field(default_factory=dict)
126
+    
127
+    def to_dict(self) -> Dict[str, Any]:
128
+        """转换为字典"""
129
+        return {
130
+            "id": self.id,
131
+            "batch_id": self.batch_id,
132
+            "data_source": self.data_source,
133
+            "total_records": self.total_records,
134
+            "successful_records": self.successful_records,
135
+            "failed_records": self.failed_records,
136
+            "status": self.status,
137
+            "file_name": self.file_name,
138
+            "import_time": self.import_time.isoformat() if self.import_time else None,
139
+            "completed_time": self.completed_time.isoformat() if self.completed_time else None,
140
+            "error_messages": self.error_messages,
141
+            "metadata": self.metadata
142
+        }
143
+
144
+@dataclass
145
+class APIRequest:
146
+    """API请求模型"""
147
+    id: str
148
+    method: str
149
+    endpoint: str
150
+    params: Dict[str, Any]
151
+    headers: Dict[str, Any]
152
+    body: Optional[Any] = None
153
+    timestamp: datetime = field(default_factory=datetime.now)
154
+    response_code: Optional[int] = None
155
+    response_time_ms: Optional[float] = None
156
+    response_body: Optional[Any] = None
157
+    
158
+    def to_dict(self) -> Dict[str, Any]:
159
+        """转换为字典"""
160
+        return {
161
+            "id": self.id,
162
+            "method": self.method,
163
+            "endpoint": self.endpoint,
164
+            "params": self.params,
165
+            "headers": self.headers,
166
+            "body": self.body,
167
+            "timestamp": self.timestamp.isoformat(),
168
+            "response_code": self.response_code,
169
+            "response_time_ms": self.response_time_ms,
170
+            "response_body": self.response_body
171
+        }
172
+
173
+@dataclass
174
+class WebSocketConnection:
175
+    """WebSocket连接模型"""
176
+    id: str
177
+    client_ip: str
178
+    connected_at: datetime
179
+    disconnected_at: Optional[datetime] = None
180
+    subscriptions: List[str] = field(default_factory=list)
181
+    message_count: int = 0
182
+    last_message_at: Optional[datetime] = None
183
+    metadata: Dict[str, Any] = field(default_factory=dict)
184
+    
185
+    def to_dict(self) -> Dict[str, Any]:
186
+        """转换为字典"""
187
+        return {
188
+            "id": self.id,
189
+            "client_ip": self.client_ip,
190
+            "connected_at": self.connected_at.isoformat(),
191
+            "disconnected_at": self.disconnected_at.isoformat() if self.disconnected_at else None,
192
+            "subscriptions": self.subscriptions,
193
+            "message_count": self.message_count,
194
+            "last_message_at": self.last_message_at.isoformat() if self.last_message_at else None,
195
+            "metadata": self.metadata
196
+        }
197
+
198
+@dataclass
199
+class SystemStats:
200
+    """系统统计模型"""
201
+    timestamp: datetime
202
+    total_records: int
203
+    total_devices: int
204
+    active_connections: int
205
+    api_requests_count: int
206
+    alerts_count: int
207
+    data_quality_score: float
208
+    memory_usage_mb: float
209
+    cpu_usage_percent: float
210
+    
211
+    def to_dict(self) -> Dict[str, Any]:
212
+        """转换为字典"""
213
+        return {
214
+            "timestamp": self.timestamp.isoformat(),
215
+            "total_records": self.total_records,
216
+            "total_devices": self.total_devices,
217
+            "active_connections": self.active_connections,
218
+            "api_requests_count": self.api_requests_count,
219
+            "alerts_count": self.alerts_count,
220
+            "data_quality_score": self.data_quality_score,
221
+            "memory_usage_mb": self.memory_usage_mb,
222
+            "cpu_usage_percent": self.cpu_usage_percent
223
+        }
224
+
225
+class DataValidator:
226
+    """数据验证器"""
227
+    
228
+    @staticmethod
229
+    def validate_sensor_data(data: Dict[str, Any]) -> List[str]:
230
+        """验证传感器数据"""
231
+        errors = []
232
+        
233
+        # 必需字段检查
234
+        required_fields = ["device_id", "data_type", "value", "location"]
235
+        for field in required_fields:
236
+            if field not in data:
237
+                errors.append(f"缺少必需字段: {field}")
238
+        
239
+        # 数据类型验证
240
+        if "data_type" in data:
241
+            try:
242
+                DataType(data["data_type"])
243
+            except ValueError:
244
+                errors.append(f"无效的数据类型: {data['data_type']}")
245
+        
246
+        # 数值验证
247
+        if "value" in data:
248
+            try:
249
+                value = float(data["value"])
250
+                # 根据数据类型进行数值范围检查
251
+                data_type = data.get("data_type")
252
+                if data_type == "LL" and value < 0:
253
+                    errors.append("流量不能为负数")
254
+                elif data_type == "YL" and value < 0:
255
+                    errors.append("压力不能为负数")
256
+                elif data_type == "SW" and value < 0:
257
+                    errors.append("水位不能为负数")
258
+            except (ValueError, TypeError):
259
+                errors.append(f"无效的数值: {data['value']}")
260
+        
261
+        # 时间戳验证
262
+        if "timestamp" in data:
263
+            try:
264
+                if isinstance(data["timestamp"], str):
265
+                    datetime.fromisoformat(data["timestamp"])
266
+            except (ValueError, TypeError):
267
+                errors.append(f"无效的时间戳格式: {data['timestamp']}")
268
+        
269
+        return errors
270
+    
271
+    @staticmethod
272
+    def validate_device_data(data: Dict[str, Any]) -> List[str]:
273
+        """验证设备数据"""
274
+        errors = []
275
+        
276
+        # 必需字段检查
277
+        required_fields = ["id", "name", "device_type", "location"]
278
+        for field in required_fields:
279
+            if field not in data:
280
+                errors.append(f"缺少必需字段: {field}")
281
+        
282
+        # 设备ID格式验证
283
+        if "id" in data:
284
+            device_id = data["id"]
285
+            if not isinstance(device_id, str) or not device_id.strip():
286
+                errors.append("设备ID不能为空")
287
+            elif len(device_id) > 50:
288
+                errors.append("设备ID长度不能超过50个字符")
289
+        
290
+        # 状态验证
291
+        if "status" in data and data["status"] not in ["active", "inactive", "maintenance"]:
292
+            errors.append("设备状态必须是: active, inactive, maintenance")
293
+        
294
+        return errors
295
+    
296
+    @staticmethod
297
+    def validate_alert_data(data: Dict[str, Any]) -> List[str]:
298
+        """验证警报数据"""
299
+        errors = []
300
+        
301
+        # 必需字段检查
302
+        required_fields = ["device_id", "alert_type", "level", "message"]
303
+        for field in required_fields:
304
+            if field not in data:
305
+                errors.append(f"缺少必需字段: {field}")
306
+        
307
+        # 警报级别验证
308
+        if "level" in data:
309
+            try:
310
+                AlertLevel(data["level"])
311
+            except ValueError:
312
+                errors.append(f"无效的警报级别: {data['level']}")
313
+        
314
+        return errors
315
+
316
+# 全局验证器实例
317
+validator = DataValidator()

+ 409
- 0
src/utils/data_utils.py Datei anzeigen

@@ -0,0 +1,409 @@
1
+"""
2
+数据处理工具模块
3
+提供数据验证、转换、格式化等工具函数
4
+"""
5
+import json
6
+import csv
7
+import pandas as pd
8
+from typing import Dict, List, Any, Optional, Union
9
+from datetime import datetime, timedelta
10
+import hashlib
11
+import logging
12
+
13
+# 配置日志
14
+logging.basicConfig(level=logging.INFO)
15
+logger = logging.getLogger(__name__)
16
+
17
+class DataConverter:
18
+    """数据转换器"""
19
+    
20
+    # 水利行业标准字段映射
21
+    FIELD_MAPPING = {
22
+        "流量": "LL",
23
+        "压力": "YL",
24
+        "水位": "SW",
25
+        "浊度": "ZD",
26
+        "pH值": "PH",
27
+        "温度": "WD",
28
+        "电导率": "DD",
29
+        "硬度": "YD",
30
+        # 支持常见的中文字段名
31
+        "流量计": "LL",
32
+        "压力表": "YL",
33
+        "水位计": "SW",
34
+        "浊度仪": "ZD",
35
+        "pH计": "PH",
36
+        "温度计": "WD",
37
+        "电导率仪": "DD",
38
+        "硬度计": "YD"
39
+    }
40
+    
41
+    # 单位转换
42
+    UNIT_CONVERSIONS = {
43
+        # 流量单位转换 (m³/h)
44
+        "m³/h": 1.0,
45
+        "L/s": 3.6,  # L/s = m³/h / 1000 * 3600
46
+        "m³/d": 1/24,  # m³/d = m³/h / 24
47
+        "L/min": 1/60,  # L/min = m³/h / 1000 * 60
48
+        
49
+        # 压力单位转换 (MPa)
50
+        "MPa": 1.0,
51
+        "kPa": 0.001,  # kPa = MPa / 1000
52
+        "bar": 0.1,  # bar = MPa * 10
53
+        "kgf/cm²": 0.0980665,  # kgf/cm² = MPa / 0.0980665
54
+        
55
+        # 水位单位转换 (m)
56
+        "m": 1.0,
57
+        "cm": 0.01,  # cm = m / 100
58
+        "mm": 0.001,  # mm = m / 1000
59
+        
60
+        # 浊度单位转换 (NTU)
61
+        "NTU": 1.0,
62
+        "FNU": 1.0,  # FNU ≈ NTU
63
+        
64
+        # pH值单位转换
65
+        "pH": 1.0,
66
+        
67
+        # 温度单位转换 (°C)
68
+        "°C": 1.0,
69
+        "K": 1.0,  # 相对差值
70
+        "°F": lambda x: (x - 32) / 1.8,  # °F to °C
71
+        
72
+        # 电导率单位转换 (μS/cm)
73
+        "μS/cm": 1.0,
74
+        "mS/cm": 1000,  # mS/cm = μS/cm * 1000
75
+        "S/m": 10000  # S/m = μS/cm * 100
76
+    }
77
+    
78
+    @classmethod
79
+    def normalize_field_name(cls, field_name: str) -> str:
80
+        """标准化字段名"""
81
+        if not field_name:
82
+            return ""
83
+        
84
+        field_name = field_name.strip().upper()
85
+        
86
+        # 如果已经是标准格式,直接返回
87
+        if field_name in cls.FIELD_MAPPING.values():
88
+            return field_name
89
+        
90
+        # 查映射表
91
+        if field_name in cls.FIELD_MAPPING:
92
+            return cls.FIELD_MAPPING[field_name]
93
+        
94
+        # 英文映射
95
+        english_mapping = {
96
+            "flow": "LL",
97
+            "pressure": "YL", 
98
+            "level": "SW",
99
+            "turbidity": "ZD",
100
+            "ph": "PH",
101
+            "temperature": "WD",
102
+            "conductivity": "DD",
103
+            "hardness": "YD"
104
+        }
105
+        
106
+        if field_name.lower() in english_mapping:
107
+            return english_mapping[field_name.lower()]
108
+        
109
+        return field_name
110
+    
111
+    @classmethod
112
+    def convert_unit(cls, value: float, from_unit: str, to_unit: str) -> float:
113
+        """单位转换"""
114
+        if from_unit == to_unit:
115
+            return value
116
+        
117
+        if from_unit not in cls.UNIT_CONVERSIONS:
118
+            raise ValueError(f"不支持的单位: {from_unit}")
119
+        
120
+        if to_unit not in cls.UNIT_CONVERSIONS:
121
+            raise ValueError(f"不支持的目标单位: {to_unit}")
122
+        
123
+        from_conv = cls.UNIT_CONVERSIONS[from_unit]
124
+        to_conv = cls.UNIT_CONVERSIONS[to_unit]
125
+        
126
+        if callable(from_conv):
127
+            value = from_conv(value)
128
+        
129
+        if callable(to_conv):
130
+            return value / to_conv
131
+        else:
132
+            return value * (to_conv / from_conv)
133
+    
134
+    @classmethod
135
+    def validate_sensor_data(cls, data: Dict[str, Any]) -> Dict[str, Any]:
136
+        """验证传感器数据"""
137
+        errors = []
138
+        validated_data = {}
139
+        
140
+        # 必需字段验证
141
+        required_fields = ["device_id", "data_type", "value"]
142
+        for field in required_fields:
143
+            if field not in data:
144
+                errors.append(f"缺少必需字段: {field}")
145
+            else:
146
+                validated_data[field] = data[field]
147
+        
148
+        # 数据类型验证和标准化
149
+        if "data_type" in validated_data:
150
+            original_type = validated_data["data_type"]
151
+            validated_data["data_type"] = cls.normalize_field_name(original_type)
152
+            
153
+            if validated_data["data_type"] != original_type:
154
+                logger.info(f"字段名标准化: {original_type} -> {validated_data['data_type']}")
155
+        
156
+        # 数值验证
157
+        if "value" in validated_data:
158
+            try:
159
+                validated_data["value"] = float(validated_data["value"])
160
+                # 检查数值范围
161
+                data_type = validated_data.get("data_type", "")
162
+                if data_type == "LL" and validated_data["value"] < 0:
163
+                    errors.append("流量不能为负数")
164
+                elif data_type == "YL" and validated_data["value"] < 0:
165
+                    errors.append("压力不能为负数")
166
+                elif data_type == "SW" and validated_data["value"] < 0:
167
+                    errors.append("水位不能为负数")
168
+            except (ValueError, TypeError):
169
+                errors.append(f"无效的数值: {validated_data['value']}")
170
+        
171
+        # 地点验证
172
+        if "location" not in validated_data or not validated_data["location"]:
173
+            validated_data["location"] = "未知"
174
+        
175
+        # 时间戳处理
176
+        if "timestamp" in data:
177
+            try:
178
+                if isinstance(data["timestamp"], str):
179
+                    validated_data["timestamp"] = datetime.fromisoformat(data["timestamp"])
180
+                else:
181
+                    validated_data["timestamp"] = data["timestamp"]
182
+            except (ValueError, TypeError):
183
+                validated_data["timestamp"] = datetime.now()
184
+        else:
185
+            validated_data["timestamp"] = datetime.now()
186
+        
187
+        return {
188
+            "valid": len(errors) == 0,
189
+            "data": validated_data,
190
+            "errors": errors
191
+        }
192
+
193
+class DataFormatter:
194
+    """数据格式化器"""
195
+    
196
+    @staticmethod
197
+    def format_sensor_data(data: Dict[str, Any], format_type: str = "json") -> str:
198
+        """格式化传感器数据"""
199
+        if format_type == "json":
200
+            return json.dumps(data, ensure_ascii=False, indent=2)
201
+        elif format_type == "csv":
202
+            # CSV格式只包含关键字段
203
+            csv_fields = ["device_id", "data_type", "value", "location", "timestamp"]
204
+            csv_data = {k: data.get(k, "") for k in csv_fields}
205
+            import io
206
+            output = io.StringIO()
207
+            writer = csv.DictWriter(output, fieldnames=csv_fields)
208
+            writer.writeheader()
209
+            writer.writerow(csv_data)
210
+            return output.getvalue()
211
+        else:
212
+            raise ValueError(f"不支持的格式类型: {format_type}")
213
+    
214
+    @staticmethod
215
+    def format_statistics(stats: Dict[str, Any], format_type: str = "text") -> str:
216
+        """格式化统计数据"""
217
+        if format_type == "json":
218
+            return json.dumps(stats, ensure_ascii=False, indent=2)
219
+        elif format_type == "text":
220
+            lines = ["数据统计报告", "=" * 20]
221
+            lines.append(f"总记录数: {stats.get('total_records', 0)}")
222
+            
223
+            if "by_type" in stats:
224
+                lines.append("\n按数据类型统计:")
225
+                for data_type, count in stats["by_type"].items():
226
+                    lines.append(f"  {data_type}: {count} 条")
227
+            
228
+            if "by_device" in stats:
229
+                lines.append("\n按设备统计:")
230
+                for device_id, count in list(stats["by_device"].items())[:10]:  # 只显示前10个
231
+                    lines.append(f"  {device_id}: {count} 条")
232
+            
233
+            return "\n".join(lines)
234
+        else:
235
+            raise ValueError(f"不支持的格式类型: {format_type}")
236
+
237
+class DataHasher:
238
+    """数据哈希工具"""
239
+    
240
+    @staticmethod
241
+    def calculate_data_hash(data: Dict[str, Any]) -> str:
242
+        """计算数据哈希值"""
243
+        # 将数据转换为字符串
244
+        data_str = json.dumps(data, sort_keys=True, ensure_ascii=False)
245
+        
246
+        # 计算MD5哈希
247
+        hash_md5 = hashlib.md5(data_str.encode())
248
+        return hash_md5.hexdigest()
249
+    
250
+    @staticmethod
251
+    def generate_data_id(device_id: str, data_type: str, timestamp: datetime) -> str:
252
+        """生成数据ID"""
253
+        # 使用设备ID、数据类型和时间戳生成唯一ID
254
+        time_str = timestamp.strftime("%Y%m%d_%H%M%S")
255
+        hash_input = f"{device_id}_{data_type}_{time_str}"
256
+        hash_md5 = hashlib.md5(hash_input.encode())
257
+        return f"{data_type}_{device_id}_{hash_md5.hexdigest()[:8]}"
258
+
259
+class DataQualityChecker:
260
+    """数据质量检查器"""
261
+    
262
+    @staticmethod
263
+    def check_data_quality(records: List[Dict[str, Any]]) -> Dict[str, Any]:
264
+        """检查数据质量"""
265
+        quality_report = {
266
+            "total_records": len(records),
267
+            "valid_records": 0,
268
+            "invalid_records": 0,
269
+            "quality_score": 0,
270
+            "issues": [],
271
+            "statistics": {}
272
+        }
273
+        
274
+        if not records:
275
+            quality_report["quality_score"] = 0
276
+            return quality_report
277
+        
278
+        valid_records = []
279
+        
280
+        for record in records:
281
+            issues = []
282
+            
283
+            # 检查必需字段
284
+            required_fields = ["device_id", "data_type", "value"]
285
+            for field in required_fields:
286
+                if field not in record or not record[field]:
287
+                    issues.append(f"缺少必需字段: {field}")
288
+            
289
+            # 检查数据类型
290
+            if "data_type" in record and record["data_type"]:
291
+                valid_types = ["LL", "YL", "SW", "ZD", "PH", "WD", "DD", "YD"]
292
+                if record["data_type"] not in valid_types:
293
+                    issues.append(f"无效的数据类型: {record['data_type']}")
294
+            
295
+            # 检查数值范围
296
+            if "value" in record and record["value"]:
297
+                try:
298
+                    value = float(record["value"])
299
+                    data_type = record.get("data_type", "")
300
+                    
301
+                    if data_type == "LL" and value < 0:
302
+                        issues.append("流量不能为负数")
303
+                    elif data_type == "YL" and value < 0:
304
+                        issues.append("压力不能为负数")
305
+                    elif data_type == "SW" and value < 0:
306
+                        issues.append("水位不能为负数")
307
+                    
308
+                    # 检查异常值
309
+                    if data_type == "LL" and value > 10000:
310
+                        issues.append("流量值异常大")
311
+                    elif data_type == "YL" and value > 10:
312
+                        issues.append("压力值异常大")
313
+                        
314
+                except (ValueError, TypeError):
315
+                    issues.append("无效的数值格式")
316
+            
317
+            if not issues:
318
+                valid_records.append(record)
319
+                quality_report["valid_records"] += 1
320
+            else:
321
+                quality_report["invalid_records"] += 1
322
+                quality_report["issues"].extend(issues)
323
+        
324
+        # 计算质量分数
325
+        quality_report["quality_score"] = quality_report["valid_records"] / len(records)
326
+        
327
+        # 统计信息
328
+        if records:
329
+            quality_report["statistics"] = {
330
+                "completeness": quality_report["valid_records"] / len(records),
331
+                "uniqueness": len(set(r.get("device_id", "") for r in valid_records)) / len(valid_records) if valid_records else 0,
332
+                "timeliness": quality_report.calculate_timeliness(records)
333
+            }
334
+        
335
+        return quality_report
336
+    
337
+    @staticmethod
338
+    def calculate_timeliness(records: List[Dict[str, Any]]) -> float:
339
+        """计算数据及时性(24小时内的数据比例)"""
340
+        if not records:
341
+            return 0
342
+        
343
+        now = datetime.now()
344
+        recent_count = 0
345
+        
346
+        for record in records:
347
+            timestamp = record.get("timestamp")
348
+            if timestamp:
349
+                try:
350
+                    if isinstance(timestamp, str):
351
+                        timestamp = datetime.fromisoformat(timestamp)
352
+                    
353
+                    time_diff = now - timestamp
354
+                    if time_diff <= timedelta(hours=24):
355
+                        recent_count += 1
356
+                except:
357
+                    pass
358
+        
359
+        return recent_count / len(records)
360
+
361
+class DataExporter:
362
+    """数据导出工具"""
363
+    
364
+    @staticmethod
365
+    def export_to_csv(records: List[Dict[str, Any]], file_path: str) -> bool:
366
+        """导出为CSV文件"""
367
+        try:
368
+            if not records:
369
+                return False
370
+            
371
+            # 获取所有字段
372
+            all_fields = set()
373
+            for record in records:
374
+                all_fields.update(record.keys())
375
+            
376
+            # 排序字段
377
+            field_order = ["device_id", "data_type", "value", "location", "timestamp"]
378
+            for field in all_fields:
379
+                if field not in field_order:
380
+                    field_order.append(field)
381
+            
382
+            with open(file_path, 'w', newline='', encoding='utf-8') as csvfile:
383
+                writer = csv.DictWriter(csvfile, fieldnames=field_order)
384
+                writer.writeheader()
385
+                writer.writerows(records)
386
+            
387
+            return True
388
+            
389
+        except Exception as e:
390
+            logger.error(f"导出CSV失败: {str(e)}")
391
+            return False
392
+    
393
+    @staticmethod
394
+    def export_to_json(records: List[Dict[str, Any]], file_path: str) -> bool:
395
+        """导出为JSON文件"""
396
+        try:
397
+            with open(file_path, 'w', encoding='utf-8') as jsonfile:
398
+                json.dump(records, jsonfile, ensure_ascii=False, indent=2, default=str)
399
+            return True
400
+        except Exception as e:
401
+            logger.error(f"导出JSON失败: {str(e)}")
402
+            return False
403
+
404
+# 全局工具实例
405
+data_converter = DataConverter()
406
+data_formatter = DataFormatter()
407
+data_hasher = DataHasher()
408
+quality_checker = DataQualityChecker()
409
+data_exporter = DataExporter()

+ 214
- 0
src/websocket/websocket_server.py Datei anzeigen

@@ -0,0 +1,214 @@
1
+"""
2
+WebSocket 实时数据推送服务器
3
+支持实时数据推送、连接管理和数据广播
4
+"""
5
+import asyncio
6
+import json
7
+import websockets
8
+from datetime import datetime
9
+from typing import Set, Dict, Any
10
+import logging
11
+
12
+# 配置日志
13
+logging.basicConfig(level=logging.INFO)
14
+logger = logging.getLogger(__name__)
15
+
16
+class WebSocketServer:
17
+    """WebSocket服务器类"""
18
+    
19
+    def __init__(self, host: str = "0.0.0.0", port: int = 8765):
20
+        self.host = host
21
+        self.port = port
22
+        self.clients: Set[websockets.WebSocketServerProtocol] = set()
23
+        self.data_history: list = []  # 存储最近的数据用于新连接
24
+        
25
+    async def register_client(self, websocket: websockets.WebSocketServerProtocol):
26
+        """注册新客户端"""
27
+        self.clients.add(websocket)
28
+        client_ip = websocket.remote_address[0]
29
+        logger.info(f"新客户端连接: {client_ip}")
30
+        
31
+        # 发送历史数据给新连接的客户端
32
+        if self.data_history:
33
+            await websocket.send(json.dumps({
34
+                "type": "history",
35
+                "data": self.data_history[-50:]  # 发送最近50条数据
36
+            }))
37
+        
38
+        # 发送欢迎消息
39
+        await websocket.send(json.dumps({
40
+            "type": "welcome",
41
+            "message": "已连接到水务管理系统实时数据服务器",
42
+            "timestamp": datetime.now().isoformat()
43
+        }))
44
+    
45
+    async def unregister_client(self, websocket: websockets.WebSocketServerProtocol):
46
+        """注销客户端"""
47
+        if websocket in self.clients:
48
+            self.clients.remove(websocket)
49
+            client_ip = websocket.remote_address[0]
50
+            logger.info(f"客户端断开连接: {client_ip}")
51
+    
52
+    async def broadcast_data(self, data: Dict[str, Any]):
53
+        """广播数据到所有连接的客户端"""
54
+        if not self.clients:
55
+            return
56
+            
57
+        # 添加时间戳
58
+        data["timestamp"] = datetime.now().isoformat()
59
+        
60
+        # 保存历史数据
61
+        self.data_history.append(data)
62
+        if len(self.data_history) > 1000:  # 只保留最近1000条记录
63
+            self.data_history.pop(0)
64
+        
65
+        # 广播数据
66
+        message = json.dumps(data)
67
+        disconnected_clients = []
68
+        
69
+        for client in self.clients:
70
+            try:
71
+                await client.send(message)
72
+            except websockets.exceptions.ConnectionClosed:
73
+                disconnected_clients.append(client)
74
+        
75
+        # 清理已断开的连接
76
+        for client in disconnected_clients:
77
+            await self.unregister_client(client)
78
+    
79
+    async def handle_client_message(self, websocket: websockets.WebSocketServerProtocol, message: str):
80
+        """处理客户端消息"""
81
+        try:
82
+            data = json.loads(message)
83
+            
84
+            if data.get("type") == "subscribe":
85
+                # 处理订阅请求
86
+                subscription_type = data.get("subscription", "all")
87
+                response = {
88
+                    "type": "subscription_ack",
89
+                    "subscription": subscription_type,
90
+                    "message": f"已订阅 {subscription_type} 类型数据"
91
+                }
92
+                await websocket.send(json.dumps(response))
93
+                logger.info(f"客户端订阅了 {subscription_type} 类型数据")
94
+            
95
+            elif data.get("type") == "ping":
96
+                # 响应心跳检测
97
+                response = {
98
+                    "type": "pong",
99
+                    "timestamp": datetime.now().isoformat()
100
+                }
101
+                await websocket.send(json.dumps(response))
102
+            
103
+            else:
104
+                logger.warning(f"未知的消息类型: {data.get('type', 'unknown')}")
105
+                
106
+        except json.JSONDecodeError:
107
+            logger.error("无效的JSON消息")
108
+        except Exception as e:
109
+            logger.error(f"处理客户端消息时出错: {str(e)}")
110
+    
111
+    async def client_handler(self, websocket: websockets.WebSocketServerProtocol, path: str):
112
+        """处理客户端连接"""
113
+        await self.register_client(websocket)
114
+        
115
+        try:
116
+            async for message in websocket:
117
+                await self.handle_client_message(websocket, message)
118
+        except websockets.exceptions.ConnectionClosed:
119
+            pass
120
+        finally:
121
+            await self.unregister_client(websocket)
122
+    
123
+    async def start_server(self):
124
+        """启动WebSocket服务器"""
125
+        logger.info(f"启动WebSocket服务器: {self.host}:{self.port}")
126
+        
127
+        # 创建并启动服务器
128
+        self.server = await websockets.serve(
129
+            self.client_handler,
130
+            self.host,
131
+            self.port
132
+        )
133
+        
134
+        logger.info("WebSocket服务器已启动")
135
+        return self.server
136
+    
137
+    async def send_sensor_data(self, sensor_data: Dict[str, Any]):
138
+        """发送传感器数据"""
139
+        data = {
140
+            "type": "sensor_data",
141
+            "data_type": sensor_data.get("data_type"),
142
+            "device_id": sensor_data.get("device_id"),
143
+            "value": sensor_data.get("value"),
144
+            "location": sensor_data.get("location"),
145
+            "timestamp": datetime.now().isoformat()
146
+        }
147
+        await self.broadcast_data(data)
148
+    
149
+    async def send_alert(self, alert_data: Dict[str, Any]):
150
+        """发送警报信息"""
151
+        data = {
152
+            "type": "alert",
153
+            "level": alert_data.get("level", "warning"),
154
+            "message": alert_data.get("message"),
155
+            "device_id": alert_data.get("device_id"),
156
+            "timestamp": datetime.now().isoformat()
157
+        }
158
+        await self.broadcast_data(data)
159
+
160
+# 全局WebSocket服务器实例
161
+websocket_server = WebSocketServer()
162
+
163
+# 示例数据生成器
164
+async def data_generator():
165
+    """模拟数据生成器"""
166
+    import random
167
+    
168
+    while True:
169
+        await asyncio.sleep(5)  # 每5秒发送一次数据
170
+        
171
+        # 模拟不同的传感器数据
172
+        sensor_types = ["LL", "YL", "SW", "ZD"]
173
+        sensor_type = random.choice(sensor_types)
174
+        
175
+        # 根据传感器类型生成合理的数值范围
176
+        if sensor_type == "LL":  # 流量
177
+            value = random.uniform(10, 100)
178
+        elif sensor_type == "YL":  # 压力
179
+            value = random.uniform(0.1, 1.0)
180
+        elif sensor_type == "SW":  # 水位
181
+            value = random.uniform(0, 10)
182
+        else:  # ZD 浊度
183
+            value = random.uniform(0, 50)
184
+        
185
+        sensor_data = {
186
+            "data_type": sensor_type,
187
+            "device_id": f"device_{random.randint(1, 10)}",
188
+            "value": round(value, 2),
189
+            "location": random.choice(["A区", "B区", "C区", "D区"])
190
+        }
191
+        
192
+        await websocket_server.send_sensor_data(sensor_data)
193
+
194
+# 启动服务器和生成器
195
+async def main():
196
+    """主函数"""
197
+    # 启动WebSocket服务器
198
+    server = await websocket_server.start_server()
199
+    
200
+    # 启动数据生成器
201
+    generator_task = asyncio.create_task(data_generator())
202
+    
203
+    # 保持服务器运行
204
+    try:
205
+        await asyncio.Future()  # 永远等待
206
+    except KeyboardInterrupt:
207
+        logger.info("收到中断信号,正在关闭服务器...")
208
+        server.close()
209
+        await server.wait_closed()
210
+        generator_task.cancel()
211
+        await generator_task
212
+
213
+if __name__ == "__main__":
214
+    asyncio.run(main())