-- Flyway 迁移脚本 V1.1 -- 添加优化索引和触发器 -- 执行时间: 2026-06-15 -- ========= 性能优化索引 ========= -- 物联网设备相关查询优化索引 CREATE INDEX IF NOT EXISTS idx_iot_device_type_area_status ON iot_device(device_type, area, status); CREATE INDEX IF NOT EXISTS idx_iot_device_geom_area ON iot_device USING GIST(geom) WHERE area IS NOT NULL; CREATE INDEX IF NOT EXISTS idx_iot_device_last_report_time ON iot_device(last_report_time DESC); -- 设备遥测数据时间范围查询优化 CREATE INDEX IF NOT EXISTS idx_iot_telemetry_device_time ON iot_telemetry_cache(device_sn, ts DESC); CREATE INDEX IF NOT EXISTS idx_iot_telemetry_metric_time ON iot_telemetry_cache(metric_key, ts DESC); CREATE INDEX IF NOT EXISTS idx_iot_telemetry_device_metric_time ON iot_telemetry_cache(device_sn, metric_key, ts DESC); -- 营业收费业务查询优化索引 CREATE INDEX IF NOT EXISTS idx_rev_customer_area_status ON rev_customer(area, status); CREATE INDEX IF NOT EXISTS idx_rev_meter_customer_status ON rev_meter(customer_id, status); CREATE INDEX IF NOT EXISTS idx_rev_meter_device ON rev_meter(device_id); CREATE INDEX IF NOT EXISTS idx_rev_reading_meter_date ON rev_reading(meter_id, reading_date); CREATE INDEX IF NOT EXISTS idx_rev_bill_customer_period ON rev_bill(customer_id, bill_period); CREATE INDEX IF NOT EXISTS idx_rev_bill_status_date ON rev_bill(status, due_date); -- 巡检系统查询优化索引 CREATE INDEX IF NOT EXISTS idx_patrol_route_area_status ON patrol_route(area, status); CREATE INDEX IF NOT EXISTS idx_patrol_task_assignee_date ON patrol_task(assignee_id, task_date); CREATE INDEX IF NOT EXISTS idx_patrol_task_route_date ON patrol_task(route_id, task_date); CREATE INDEX IF NOT EXISTS idx_patrol_record_task_point ON patrol_record(task_id, point_seq); CREATE INDEX IF NOT EXISTS idx_patrol_record_device_time ON patrol_record(device_id, record_time); -- 报警管理查询优化索引 CREATE INDEX IF NOT EXISTS idx_alert_rule_enabled_level ON alert_rule(enabled, alert_level); CREATE INDEX IF NOT EXISTS idx_alert_event_device_time ON alert_event(device_sn, created_at DESC); CREATE INDEX IF NOT EXISTS idx_alert_event_level_status ON alert_event(alert_level, dispatched); CREATE INDEX IF NOT EXISTS idx_alert_event_rule_time ON alert_event(rule_id, created_at DESC); -- 系统日志查询优化索引 CREATE INDEX IF NOT EXISTS idx_sys_oplog_user_time ON sys_operation_log(user_id, created_at DESC); CREATE INDEX IF NOT EXISTS idx_sys_oplog_type_time ON sys_operation_log(operation_type, created_at DESC); CREATE INDEX IF NOT EXISTS idx_sys_oplog_target ON sys_operation_log(target_type, target_id); -- 用户权限相关优化索引 CREATE INDEX IF NOT EXISTS idx_sys_user_role_active ON sys_user(role_type) WHERE status = 1; CREATE INDEX IF NOT EXISTS idx_sys_role_menu_active ON sys_menu(status) WHERE status = 1; CREATE INDEX IF NOT EXISTS idx_sys_config_key_active ON sys_config(config_key) WHERE status = 1; -- ========= 复合查询优化索引 ========= -- 复合业务查询索引 CREATE INDEX IF NOT EXISTS idx_iot_device_customer_flow ON iot_device WHERE device_type = 'flow_meter' AND area = '精芒片区'; -- 账单状态和到期时间复合索引 CREATE INDEX IF NOT EXISTS idx_rev_bill_overdue ON rev_bill(status, due_date) WHERE status IN ('pending', 'overdue'); -- 报警紧急程度和状态复合索引 CREATE INDEX IF NOT EXISTS idx_alert_urgent ON alert_event(alert_level, dispatched) WHERE alert_level IN ('critical', 'emergency'); -- 巡检任务优先级索引 CREATE INDEX IF NOT EXISTS idx_patrol_priority ON patrol_task WHERE status = 'pending' AND task_date = CURRENT_DATE; -- ========= 分区索引(如果需要) ========= -- 对于大数据量表,可以考虑按时间分区 -- 例如:按月分区大表的数据 -- ========= 触发器优化 ========= -- 自动更新时间触发器优化版 CREATE OR REPLACE FUNCTION update_updated_timestamp() RETURNS TRIGGER AS $$ BEGIN NEW.updated_at = CURRENT_TIMESTAMP; RETURN NEW; END; $$ LANGUAGE plpgsql; -- 为主要表添加自动更新触发器 DROP TRIGGER IF EXISTS trg_iot_device_update ON iot_device; CREATE TRIGGER trg_iot_device_update BEFORE UPDATE ON iot_device FOR EACH ROW EXECUTE FUNCTION update_updated_timestamp(); DROP TRIGGER IF EXISTS trg_rev_customer_update ON rev_customer; CREATE TRIGGER trg_rev_customer_update BEFORE UPDATE ON rev_customer FOR EACH ROW EXECUTE FUNCTION update_updated_timestamp(); DROP TRIGGER IF EXISTS trg_rev_meter_update ON rev_meter; CREATE TRIGGER trg_rev_meter_update BEFORE UPDATE ON rev_meter FOR EACH ROW EXECUTE FUNCTION update_updated_timestamp(); DROP TRIGGER IF EXISTS trg_rev_bill_update ON rev_bill; CREATE TRIGGER trg_rev_bill_update BEFORE UPDATE ON rev_bill FOR EACH ROW EXECUTE FUNCTION update_updated_timestamp(); DROP TRIGGER IF EXISTS trg_alert_event_update ON alert_event; CREATE TRIGGER trg_alert_event_update BEFORE UPDATE ON alert_event FOR EACH ROW EXECUTE FUNCTION update_updated_timestamp(); -- ========= 数据完整性约束 ========= -- 添加外键约束检查 ALTER TABLE iot_device ADD CONSTRAINT fk_iot_device_model FOREIGN KEY (model_id) REFERENCES iot_device_model(id) ON DELETE RESTRICT; ALTER TABLE rev_meter ADD CONSTRAINT fk_rev_meter_customer FOREIGN KEY (customer_id) REFERENCES rev_customer(id) ON DELETE CASCADE; ALTER TABLE rev_meter ADD CONSTRAINT fk_rev_meter_device FOREIGN KEY (device_id) REFERENCES iot_device(id) ON DELETE SET NULL; ALTER TABLE rev_reading ADD CONSTRAINT fk_rev_reading_meter FOREIGN KEY (meter_id) REFERENCES rev_meter(id) ON DELETE CASCADE; ALTER TABLE rev_bill ADD CONSTRAINT fk_rev_bill_customer FOREIGN KEY (customer_id) REFERENCES rev_customer(id) ON DELETE CASCADE; ALTER TABLE alert_event ADD CONSTRAINT fk_alert_event_rule FOREIGN KEY (rule_id) REFERENCES alert_rule(id) ON DELETE CASCADE; ALTER TABLE alert_event ADD CONSTRAINT fk_alert_event_device FOREIGN KEY (device_id) REFERENCES iot_device(id) ON DELETE SET NULL; -- ========= 检查约束 ========= -- 添加业务规则检查约束 ALTER TABLE iot_device ADD CONSTRAINT chk_device_status CHECK (status IN ('online', 'offline', 'maintenance', 'fault')); ALTER TABLE rev_customer ADD CONSTRAINT chk_customer_status CHECK (status IN ('active', 'inactive', 'suspended')); ALTER TABLE rev_bill ADD CONSTRAINT chk_bill_status CHECK (status IN ('pending', 'partial', 'paid', 'overdue')); ALTER TABLE alert_event ADD CONSTRAINT chk_alert_level CHECK (alert_level IN ('info', 'warning', 'critical', 'emergency')); -- ========= 视图创建(优化常用查询) ========= -- 设备状态统计视图 CREATE OR REPLACE VIEW v_device_status_stats AS SELECT area, device_type, status, COUNT(*) as device_count, COUNT(CASE WHEN last_report_time > CURRENT_TIMESTAMP - INTERVAL '1 hour' THEN 1 END) as online_count, COUNT(CASE WHEN last_report_time <= CURRENT_TIMESTAMP - INTERVAL '1 hour' THEN 1 END) as offline_count FROM iot_device GROUP BY area, device_type, status; -- 区域用水量统计视图 CREATE OR REPLACE VIEW v_water_consumption_stats AS SELECT c.area, c.customer_type, COUNT(c.id) as customer_count, COUNT(m.id) as meter_count, COALESCE(SUM(r.consumption), 0) as total_consumption, COALESCE(AVG(r.consumption), 0) as avg_consumption FROM rev_customer c LEFT JOIN rev_meter m ON c.id = m.customer_id AND m.status = 'active' LEFT JOIN rev_reading r ON m.id = r.meter_id AND r.reading_date >= CURRENT_DATE - INTERVAL '30 days' GROUP BY c.area, c.customer_type; -- 报警事件统计视图 CREATE OR REPLACE VIEW v_alert_stats AS SELECT area, alert_level, COUNT(*) as alert_count, COUNT(CASE WHEN resolved_at IS NULL THEN 1 END) as pending_count, COUNT(CASE WHEN resolved_at IS NOT NULL THEN 1 END) as resolved_count, COUNT(CASE WHEN created_at > CURRENT_TIMESTAMP - INTERVAL '24 hours' THEN 1 END) as recent_count FROM alert_event a JOIN iot_device d ON a.device_sn = d.device_sn GROUP BY area, alert_level; -- 巡检任务执行统计视图 CREATE OR REPLACE VIEW v_patrol_stats AS SELECT r.area, COUNT(t.id) as task_count, COUNT(CASE WHEN t.status = 'completed' THEN 1 END) as completed_count, COUNT(CASE WHEN t.status = 'pending' THEN 1 END) as pending_count, COUNT(CASE WHEN t.status = 'in_progress' THEN 1 END) as in_progress_count FROM patrol_route r LEFT JOIN patrol_task t ON r.id = t.route_id AND t.task_date >= CURRENT_DATE - INTERVAL '30 days' GROUP BY r.area; -- ========= 函数创建(业务逻辑封装) ========= -- 获取设备最新数据的函数 CREATE OR REPLACE FUNCTION get_device_latest_metrics(device_sn VARCHAR(100)) RETURNS TABLE(metric_key VARCHAR(50), metric_value DECIMAL(20,4), ts TIMESTAMP) AS $$ BEGIN RETURN QUERY SELECT DISTINCT ON (metric_key) metric_key, metric_value, ts FROM iot_telemetry_cache WHERE device_sn = device_sn ORDER BY metric_key, ts DESC; END; $$ LANGUAGE plpgsql; -- 计算水费的函数 CREATE OR REPLACE FUNCTION calculate_water_fee(customer_id BIGINT, period VARCHAR(10)) RETURNS TABLE(consumption DECIMAL(10,2), water_fee DECIMAL(10,2), sewage_fee DECIMAL(10,2), total_fee DECIMAL(10,2)) AS $$ DECLARE water_rate RECORD; usage DECIMAL(10,2); total_consumption DECIMAL(10,2) := 0; fee DECIMAL(10,2) := 0; sewage_charge DECIMAL(10,2) := 0; BEGIN -- 获取用水量 SELECT COALESCE(SUM(consumption), 0) INTO total_consumption FROM rev_reading r JOIN rev_meter m ON r.meter_id = m.id WHERE m.customer_id = customer_id AND r.reading_date LIKE period || '%'; -- 获取水费标准 SELECT * INTO water_rate FROM rev_water_rate WHERE customer_type = (SELECT customer_type FROM rev_customer WHERE id = customer_id) AND start_date <= to_date(period, 'YYYY-MM') AND (end_date IS NULL OR end_date >= to_date(period, 'YYYY-MM')) ORDER BY start_date DESC LIMIT 1; -- 计算阶梯水费 IF water_rate.step_threshold IS NOT NULL AND water_rate.step_rates IS NOT NULL THEN -- 实现阶梯水费计算逻辑 fee := total_consumption * 2.5; -- 简化计算 ELSE fee := total_consumption * (SELECT (jsonb_array_elements_text(water_rate.step_rates))[1]::decimal FROM jsonb_array_elements(water_rate.step_rates) LIMIT 1); END IF; -- 污水费(按水量的80%计算) sewage_charge := total_consumption * 0.8 * 1.2; RETURN QUERY VALUES (total_consumption, fee, sewage_charge, fee + sewage_charge); END; $$ LANGUAGE plpgsql; -- 获取报警历史函数 CREATE OR REPLACE FUNCTION get_alert_history(area VARCHAR(50), days INT DEFAULT 30) RETURNS TABLE(alert_id BIGINT, device_sn VARCHAR(100), alert_level VARCHAR(10), message TEXT, created_at TIMESTAMP) AS $$ BEGIN RETURN QUERY SELECT a.id, a.device_sn, a.alert_level, a.message, a.created_at FROM alert_event a JOIN iot_device d ON a.device_sn = d.device_sn WHERE d.area = area AND a.created_at >= CURRENT_TIMESTAMP - INTERVAL '1 day' * days ORDER BY a.created_at DESC; END; $$ LANGUAGE plpgsql; -- ========= 存储过程创建(批量操作) ========= -- 批量插入设备数据的存储过程 CREATE OR REPLACE PROCEDURE batch_insert_device_metrics( IN device_sn_list VARCHAR(100)[], IN metric_data JSONB[] ) AS $$ BEGIN FOREACH device_sn IN ARRAY device_sn_list LOOP FOREACH metric IN ARRAY metric_data LOOP INSERT INTO iot_telemetry_cache (device_sn, metric_key, metric_value, ts, quality) VALUES ( device_sn, metric->>'metric_key', (metric->>'metric_value')::DECIMAL(20,4), CURRENT_TIMESTAMP, (metric->>'quality')::SMALLINT ); END LOOP; END LOOP; END; $$ LANGUAGE plpgsql; -- ========= 更新 Flyway 迁移记录 ========= -- 此迁移脚本 V1.1 执行完成 -- 添加了性能优化索引、触发器、约束、视图和函数 -- 用于提升系统查询性能和数据完整性