| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323 |
- """
- 水务管理系统主程序
- 集成REST API、WebSocket和批量导入功能
- """
- import asyncio
- import logging
- import signal
- import sys
- from pathlib import Path
- import argparse
- from datetime import datetime
-
- # 导入各个模块
- from src.api.rest_api import app as rest_api_app
- from src.websocket.websocket_server import websocket_server
- from src.batch.batch_import import batch_manager
- from src.utils.data_utils import data_converter, data_formatter, quality_checker
- from src.models.models import validator
- from src.iot.app import create_iot_app
-
- # 配置日志
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- handlers=[
- logging.FileHandler('water_management.log'),
- logging.StreamHandler(sys.stdout)
- ]
- )
- logger = logging.getLogger(__name__)
-
- class WaterManagementSystem:
- """水务管理系统主类"""
-
- def __init__(self, config_file: str = "config.json"):
- self.config_file = config_file
- self.running = False
- self.tasks = []
- self.config = self.load_config()
-
- def load_config(self) -> dict:
- """加载配置文件"""
- config_path = Path(self.config_file)
- if config_path.exists():
- import json
- with open(config_path, 'r', encoding='utf-8') as f:
- return json.load(f)
- else:
- # 默认配置
- return {
- "api": {
- "host": "0.0.0.0",
- "port": 8000
- },
- "websocket": {
- "host": "0.0.0.0",
- "port": 8765
- },
- "batch": {
- "max_file_size_mb": 100,
- "supported_formats": ["csv", "excel", "json"]
- },
- "logging": {
- "level": "INFO",
- "file": "water_management.log"
- }
- }
-
- async def start_api_server(self):
- """启动API服务器"""
- import uvicorn
- logger.info("启动REST API服务器...")
-
- # 创建组合应用(包含所有模块)
- from werkzeug.middleware.dispatcher import DispatcherMiddleware
- from werkzeug.serving import WSGIRequestHandler
-
- # IoT应用
- iot_app = create_iot_app()
-
- # 组合所有应用
- def combined_app(environ, start_response):
- path = environ.get('PATH_INFO', '')
- if path.startswith('/api/iot/'):
- return iot_app(environ, start_response)
- else:
- return rest_api_app(environ, start_response)
-
- # 配置uvicorn
- api_config = uvicorn.Config(
- app=combined_app,
- host=self.config["api"]["host"],
- port=self.config["api"]["port"],
- log_level="info"
- )
- api_server = uvicorn.Server(api_config)
-
- # 在后台任务中运行
- await api_server.serve()
-
- async def start_websocket_server(self):
- """启动WebSocket服务器"""
- logger.info("启动WebSocket服务器...")
-
- # 启动WebSocket服务器
- server = await websocket_server.start_server()
-
- # 添加服务器关闭处理
- def cleanup():
- logger.info("关闭WebSocket服务器...")
- server.close()
- asyncio.create_task(server.wait_closed())
-
- return server, cleanup
-
- async def start_data_generator(self):
- """启动数据生成器(用于演示)"""
- logger.info("启动数据生成器...")
-
- while self.running:
- # 生成模拟数据
- import random
- sensor_types = ["LL", "YL", "SW", "ZD"]
- sensor_type = random.choice(sensor_types)
-
- # 根据传感器类型生成合理的数值范围
- if sensor_type == "LL": # 流量
- value = random.uniform(10, 100)
- elif sensor_type == "YL": # 压力
- value = random.uniform(0.1, 1.0)
- elif sensor_type == "SW": # 水位
- value = random.uniform(0, 10)
- else: # ZD 浊度
- value = random.uniform(0, 50)
-
- sensor_data = {
- "data_type": sensor_type,
- "device_id": f"device_{random.randint(1, 10)}",
- "value": round(value, 2),
- "location": random.choice(["A区", "B区", "C区", "D区"])
- }
-
- # 通过WebSocket发送数据
- await websocket_server.send_sensor_data(sensor_data)
-
- # 等待5秒
- await asyncio.sleep(5)
-
- async def handle_batch_import(self, file_path: str, batch_id: str, data_source: str):
- """处理批量导入请求"""
- try:
- logger.info(f"开始批量导入: {file_path}")
-
- # 验证文件
- validation_result = await batch_manager.validate_file(file_path)
- if not validation_result["valid"]:
- raise Exception(f"文件验证失败: {validation_result['error']}")
-
- # 导入文件
- result = await batch_manager.import_file(
- file_path=file_path,
- batch_id=batch_id,
- data_source=data_source,
- file_type="auto"
- )
-
- logger.info(f"批量导入完成: {result}")
- return result
-
- except Exception as e:
- logger.error(f"批量导入失败: {str(e)}")
- raise
-
- async def start_system(self):
- """启动系统"""
- logger.info("启动水务管理系统...")
-
- # 标记系统为运行状态
- self.running = True
-
- try:
- # 启动WebSocket服务器
- ws_server, ws_cleanup = await self.start_websocket_server()
- self.tasks.append(ws_server)
-
- # 启动数据生成器(如果启用)
- if self.config.get("demo_mode", False):
- generator_task = asyncio.create_task(self.start_data_generator())
- self.tasks.append(generator_task)
-
- # 启动API服务器
- await self.start_api_server()
-
- except Exception as e:
- logger.error(f"系统启动失败: {str(e)}")
- await self.stop_system()
- raise
-
- async def stop_system(self):
- """停止系统"""
- logger.info("停止水务管理系统...")
-
- self.running = False
-
- # 取消所有任务
- for task in self.tasks:
- if not task.done():
- task.cancel()
- try:
- await task
- except asyncio.CancelledError:
- pass
-
- # 清理WebSocket服务器
- if hasattr(websocket_server, 'server') and websocket_server.server:
- websocket_server.server.close()
- await websocket_server.server.wait_closed()
-
- logger.info("水务管理系统已停止")
-
- async def run(self):
- """运行系统"""
- # 设置信号处理
- def signal_handler():
- logger.info("收到停止信号...")
- asyncio.create_task(self.stop_system())
-
- for sig in [signal.SIGINT, signal.SIGTERM]:
- signal.signal(sig, signal_handler)
-
- try:
- await self.start_system()
-
- # 保持运行直到收到停止信号
- while self.running:
- await asyncio.sleep(1)
-
- except KeyboardInterrupt:
- logger.info("收到键盘中断信号")
- except Exception as e:
- logger.error(f"系统运行时出错: {str(e)}")
- finally:
- await self.stop_system()
-
- def create_sample_config():
- """创建示例配置文件"""
- sample_config = {
- "api": {
- "host": "0.0.0.0",
- "port": 8000
- },
- "websocket": {
- "host": "0.0.0.0",
- "port": 8765
- },
- "batch": {
- "max_file_size_mb": 100,
- "supported_formats": ["csv", "excel", "json"]
- },
- "demo_mode": True,
- "logging": {
- "level": "INFO",
- "file": "water_management.log"
- }
- }
-
- import json
- with open("config.json", 'w', encoding='utf-8') as f:
- json.dump(sample_config, f, indent=2, ensure_ascii=False)
-
- logger.info("示例配置文件已创建: config.json")
-
- def create_requirements():
- """创建requirements.txt文件"""
- requirements = [
- "fastapi==0.104.1",
- "uvicorn[standard]==0.24.0",
- "websockets==12.0",
- "pandas==2.1.3",
- "openpyxl==3.1.2",
- "aiofiles==23.2.1",
- "python-multipart==0.0.6",
- "jinja2==3.1.2",
- "paho-mqtt==1.6.1",
- "flask==2.3.3"
- ]
-
- with open("requirements.txt", 'w') as f:
- f.write('\n'.join(requirements))
-
- logger.info("依赖文件已创建: requirements.txt")
-
- def main():
- """主函数"""
- parser = argparse.ArgumentParser(description="水务管理系统")
- parser.add_argument("--config", "-c", default="config.json", help="配置文件路径")
- parser.add_argument("--init", action="store_true", help="初始化项目(创建配置文件和依赖)")
- parser.add_argument("--demo", action="store_true", help="启动演示模式")
-
- args = parser.parse_args()
-
- if args.init:
- create_sample_config()
- create_requirements()
- logger.info("项目初始化完成")
- return
-
- # 创建系统实例
- system = WaterManagementSystem(args.config)
-
- # 如果启用演示模式
- if args.demo:
- system.config["demo_mode"] = True
- logger.info("启用演示模式")
-
- # 运行系统
- try:
- asyncio.run(system.run())
- except KeyboardInterrupt:
- logger.info("程序已退出")
-
- if __name__ == "__main__":
- main()
|