""" GIS空间查询单元测试 覆盖设备定位、区域分析和路径规划功能 """ import unittest from unittest.mock import Mock, patch from shapely.geometry import Point, Polygon, LineString from shapely.ops import unary_union import json from datetime import datetime from src.gis.models import DeviceLocation, GeoRegion, PathRoute from src.gis.services import SpatialQueryService, RoutePlanningService, GeoAnalysisService class TestDeviceLocation(unittest.TestCase): """设备位置模型测试""" def setUp(self): self.location_data = { "device_id": "sensor_001", "name": "温度传感器1", "location": { "type": "Point", "coordinates": [108.948024, 34.265773] # 西安坐标 }, "location_type": "outdoor", "installation_date": "2026-01-15", "last_updated": "2026-06-16T10:00:00Z" } def test_location_creation(self): """测试位置创建""" location = DeviceLocation(self.location_data) self.assertEqual(location.device_id, self.location_data["device_id"]) self.assertEqual(location.name, self.location_data["name"]) self.assertEqual(location.location_type, "outdoor") # 验证几何对象 self.assertIsInstance(location.geometry, Point) self.assertEqual(location.geometry.x, 108.948024) self.assertEqual(location.geometry.y, 34.265773) def test_location_validation(self): """测试位置验证""" location = DeviceLocation(self.location_data) result = location.validate() self.assertTrue(result["valid"]) # 测试无效位置(坐标格式错误) invalid_data = self.location_data.copy() invalid_data["location"]["coordinates"] = [108.948024] # 缺少Y坐标 invalid_location = DeviceLocation(invalid_data) result = invalid_location.validate() self.assertFalse(result["valid"]) def test_distance_calculation(self): """测试距离计算""" location1 = DeviceLocation(self.location_data) # 创建第二个位置 location2_data = self.location_data.copy() location2_data["device_id"] = "sensor_002" location2_data["location"]["coordinates"] = [108.950024, 34.267773] # 附近位置 location2 = DeviceLocation(location2_data) distance = location1.calculate_distance(location2) self.assertGreater(distance, 0) self.assertLess(distance, 1000) # 应该在1公里内 class TestGeoRegion(unittest.TestCase): """地理区域模型测试""" def setUp(self): self.region_data = { "region_id": "area_001", "name": "测试区域", "boundary": { "type": "Polygon", "coordinates": [[[108.94, 34.26], [108.95, 34.26], [108.95, 34.27], [108.94, 34.27], [108.94, 34.26]]] }, "properties": { "type": "residential", "population_density": "medium", "water_supply_network": "primary" } } def test_region_creation(self): """测试区域创建""" region = GeoRegion(self.region_data) self.assertEqual(region.region_id, self.region_data["region_id"]) self.assertEqual(region.name, self.region_data["name"]) self.assertEqual(region.properties["type"], "residential") # 验证几何对象 self.assertIsInstance(region.geometry, Polygon) self.assertTrue(region.geometry.is_valid) def test_contains_point(self): """测试点包含测试""" region = GeoRegion(self.region_data) # 区域内的点 inside_point = Point(108.945, 34.265) self.assertTrue(region.contains_point(inside_point)) # 区域外的点 outside_point = Point(109.0, 34.3) self.assertFalse(region.contains_point(outside_point)) def test_region_intersection(self): """测试区域相交""" region1 = GeoRegion(self.region_data) # 创建第二个相交区域 region2_data = self.region_data.copy() region2_data["region_id"] = "area_002" region2_data["boundary"]["coordinates"] = [[[108.945, 34.265], [108.955, 34.265], [108.955, 34.275], [108.945, 34.275], [108.945, 34.265]]] region2 = GeoRegion(region2_data) intersection = region1.intersection(region2) self.assertIsNotNone(intersection) self.assertIsInstance(intersection, Polygon) self.assertGreater(intersection.area, 0) def test_buffer_creation(self): """测试缓冲区创建""" region = GeoRegion(self.region_data) # 创建500米缓冲区 buffered_region = region.create_buffer(500) self.assertIsInstance(buffered_region, Polygon) self.assertGreater(buffered_region.area, region.area) # 创建负缓冲区(收缩) shrunk_region = region.create_buffer(-200) self.assertIsInstance(shrunk_region, Polygon) self.assertLess(shrunk_region.area, region.area) class TestSpatialQueryService(unittest.TestCase): """空间查询服务测试""" def setUp(self): self.spatial_service = SpatialQueryService() @patch('src.gis.services.DeviceLocation') def test_find_devices_in_region(self, mock_device_location): """测试查找区域内的设备""" region_data = { "region_id": "area_001", "boundary": { "type": "Polygon", "coordinates": [[[108.94, 34.26], [108.95, 34.26], [108.95, 34.27], [108.94, 34.27], [108.94, 34.26]]] } } region = GeoRegion(region_data) # 模拟设备 device1 = Mock() device1.geometry = Point(108.945, 34.265) device1.device_id = "sensor_001" device2 = Mock() device2.geometry = Point(109.0, 34.3) # 区域外 device2.device_id = "sensor_002" mock_device_location.query.return_value = [device1, device2] devices = self.spatial_service.find_devices_in_region(region) self.assertEqual(len(devices), 1) self.assertEqual(devices[0].device_id, "sensor_001") @patch('src.gis.services.DeviceLocation') def test_find_devices_within_distance(self, mock_device_location): """测试查找指定距离内的设备""" center_point = Point(108.948024, 34.265773) max_distance = 1000 # 1公里 # 模拟设备 devices = [] for i in range(5): device = Mock() # 创建不同距离的点 distance = i * 200 # 0, 200, 400, 600, 800米 device.geometry = Point(center_point.x + distance/100000, center_point.y + distance/100000) device.device_id = f"sensor_{i:03d}" devices.append(device) mock_device_location.query.return_value = devices nearby_devices = self.spatial_service.find_devices_within_distance(center_point, max_distance) # 应该找到距离小于1000米的设备(前4个) self.assertEqual(len(nearby_devices), 4) self.assertNotIn("sensor_004", [d.device_id for d in nearby_devices]) @patch('src.gis.services.GeoRegion') def test_analyze_coverage(self, mock_geo_region): """测试覆盖范围分析""" # 模拟区域 region = Mock() region.geometry = Polygon([(108.94, 34.26), (108.95, 34.26), (108.95, 34.27), (108.94, 34.27), (108.94, 34.26)]) region.area = 1000000 # 1平方公里 # 模拟设备 devices = [ Mock(geometry=Point(108.945, 34.265), coverage_radius=500), Mock(geometry=Point(108.948, 34.268), coverage_radius=500), Mock(geometry=Point(108.952, 34.262), coverage_radius=500) ] mock_geo_region.query.return_value = [region] coverage_analysis = self.spatial_service.analyze_coverage(devices, region) self.assertIn("total_coverage", coverage_analysis) self.assertIn("coverage_percentage", coverage_analysis) self.assertIn("coverage_gaps", coverage_analysis) self.assertGreaterEqual(coverage_analysis["coverage_percentage"], 0) self.assertLessEqual(coverage_analysis["coverage_percentage"], 100) def test_nearest_neighbor_search(self): """测试最近邻搜索""" target_point = Point(108.948024, 34.265773) # 创建候选点 candidates = [ Point(108.950024, 34.267773), # 约300米 Point(108.955024, 34.270773), # 约600米 Point(108.945024, 34.262773), # 约400米 ] nearest = self.spatial_service.find_nearest_neighbor(target_point, candidates) self.assertIsInstance(nearest, Point) # 验证最近的点 expected_nearest = candidates[0] # 最近的点 self.assertEqual(nearest.x, expected_nearest.x) self.assertEqual(nearest.y, expected_nearest.y) class TestRoutePlanningService(unittest.TestCase): """路径规划服务测试""" def setUp(self): self.route_service = RoutePlanningService() @patch('src.gis.services.networkx') def test_shortest_path_planning(self, mock_networkx): """测试最短路径规划""" # 模拟路网 mock_networkx.graph.return_value = { 'A': {'B': 1, 'C': 4}, 'B': {'A': 1, 'C': 2, 'D': 5}, 'C': {'A': 4, 'B': 2, 'D': 1}, 'D': {'B': 5, 'C': 1} } mock_networkx.shortest_path.return_value = ['A', 'B', 'D'] mock_networkx.shortest_path_length.return_value = 6 start = 'A' end = 'D' route = self.route_service.plan_shortest_path(start, end) self.assertEqual(route.path, ['A', 'B', 'D']) self.assertEqual(route.distance, 6) self.assertEqual(route.duration, 6) # 假设速度相同 @patch('src.gis.services.networkx') def test_avoid_obstacles(self, mock_networkx): """测试障碍物避让""" # 模拟包含障碍物的路网 mock_networkx.graph.return_value = { 'A': {'B': 1, 'C': 4}, 'B': {'A': 1, 'C': 2}, 'C': {'A': 4, 'B': 2, 'D': 1}, 'D': {'C': 1} } mock_networkx.shortest_path.return_value = ['A', 'B', 'C', 'D'] mock_networkx.shortest_path_length.return_value = 4 start = 'A' end = 'D' obstacles = ['C'] # 避开节点C route = self.route_service.plan_path_avoiding_obstacles(start, end, obstacles) self.assertNotIn('C', route.path) self.assertGreater(route.distance, 3) # 应该更长 def test_multi_stop_routing(self): """测试多点路径规划""" stops = ['A', 'B', 'C', 'D'] with patch.object(self.route_service, 'plan_shortest_path') as mock_plan: mock_plan.side_effect = [ Mock(path=['A', 'B'], distance=2, duration=2), Mock(path=['B', 'C'], distance=3, duration=3), Mock(path=['C', 'D'], distance=1, duration=1) ] route = self.route_service.plan_multi_stop_route(stops) self.assertEqual(len(route.segments), 3) self.assertEqual(route.total_distance, 6) self.assertEqual(route.total_duration, 6) def test_routing_with_constraints(self): """测试约束条件路径规划""" start = Point(108.94, 34.26) end = Point(108.95, 34.27) max_distance = 2000 max_duration = 1800 # 30分钟 vehicle_type = "car" route = self.route_service.plan_constrained_route( start, end, max_distance, max_duration, vehicle_type ) self.assertIsNotNone(route) self.assertLessEqual(route.distance, max_distance) self.assertLessEqual(route.duration, max_duration) class TestGeoAnalysisService(unittest.TestCase): """地理分析服务测试""" def setUp(self): self.analysis_service = GeoAnalysisService() def test_density_analysis(self): """测试密度分析""" points = [ Point(108.94, 34.26), Point(108.942, 34.261), Point(108.944, 34.259), Point(108.941, 34.262), Point(108.943, 34.260) ] density_result = self.analysis_service.calculate_density(points, grid_size=0.001) self.assertIn("grid_density", density_result) self.assertIn("average_density", density_result) self.assertGreater(density_result["average_density"], 0) def test_hotspot_detection(self): """测试热点检测""" points = [] for i in range(100): # 生成集中在某个区域的点 x = 108.94 + (i % 10) * 0.001 y = 34.26 + (i // 10) * 0.001 points.append(Point(x, y)) # 添加一些噪声点 for i in range(20): x = 108.9 + i * 0.01 y = 34.25 + i * 0.01 points.append(Point(x, y)) hotspots = self.analysis_service.detect_hotspots(points, threshold=10) self.assertGreater(len(hotspots), 0) self.assertTrue(all(hotspot.count >= 10 for hotspot in hotspots)) def service_test_buffer_analysis(self): """测试缓冲区分析""" center_point = Point(108.948024, 34.265773) buffer_distances = [500, 1000, 1500] # 500m, 1km, 1.5km buffers = self.analysis_service.create_multiple_buffers(center_point, buffer_distances) self.assertEqual(len(buffers), 3) for i, buffer in enumerate(buffers): self.assertIsInstance(buffer, Polygon) self.assertEqual(buffer.area, (buffer_distances[i] ** 2) * 3.14159 / 1000000) # 大致面积 def test_spatial_join(self): """测试空间连接""" # 创建点数据(设备) points = [ Mock(geometry=Point(108.945, 34.265), device_id="sensor_001"), Mock(geometry=Point(108.948, 34.268), device_id="sensor_002"), Mock(geometry=Point(109.0, 34.3), device_id="sensor_003") ] # 创建多边形数据(区域) polygons = [ Mock(geometry=Polygon([(108.94, 34.26), (108.95, 34.26), (108.95, 34.27), (108.94, 34.27), (108.94, 34.26)]), region_id="area_001") ] joined_data = self.analysis_service.spatial_join(points, polygons, "within") self.assertGreater(len(joined_data), 0) # 前两个点应该在区域内,第三个点不在 self.assertEqual(len([p for p in joined_data if p["device_id"] == "sensor_001"]), 1) self.assertEqual(len([p for p in joined_data if p["device_id"] == "sensor_003"]), 0) class TestPathRoute(unittest.TestCase): """路径路线模型测试""" def setUp(self): self.route_data = { "route_id": "route_001", "name": "巡检路线", "start_point": {"coordinates": [108.94, 34.26]}, "end_point": {"coordinates": [108.95, 34.27]}, "waypoints": [ {"coordinates": [108.941, 34.261]}, {"coordinates": [108.945, 34.265]} ], "total_distance": 2500, "total_duration": 1800, "transport_mode": "walking" } def test_route_creation(self): """测试路线创建""" route = PathRoute(self.route_data) self.assertEqual(route.route_id, self.route_data["route_id"]) self.assertEqual(route.total_distance, 2500) self.assertEqual(route.total_duration, 1800) self.assertEqual(route.transport_mode, "walking") # 验证几何对象 self.assertIsInstance(route.geometry, LineString) self.assertEqual(len(route.waypoints), 2) def test_route_optimization(self): """测试路线优化""" route = PathRoute(self.route_data) # 添加更多路径点 additional_waypoints = [ {"coordinates": [108.942, 34.262]}, {"coordinates": [108.944, 34.264]} ] optimized_route = route.optimize_waypoints(additional_waypoints) self.assertIsInstance(optimized_route, PathRoute) # 优化后的距离应该更短 self.assertLess(optimized_route.total_distance, route.total_distance) def test_route_export(self): """测试路线导出""" route = PathRoute(self.route_data) # 导出为GeoJSON geojson = route.export_to_geojson() self.assertIn("type", geojson) self.assertEqual(geojson["type"], "LineString") self.assertIn("coordinates", geojson) # 导出为GPX gpx = route.export_to_gpx() self.assertIn("", gpx) self.assertIn("", kml) self.assertIn("", kml) if __name__ == '__main__': unittest.main()