-- ============================================= -- 智慧水务管理系统 - 数据引擎 DDL -- 版本: V1 -- 描述: 数据汇聚引擎相关表 -- ============================================= -- ==================== 数据源管理 ==================== -- 数据源配置表 CREATE TABLE IF NOT EXISTS de_data_source ( id BIGSERIAL PRIMARY KEY, source_name VARCHAR(100) NOT NULL, source_code VARCHAR(50) UNIQUE NOT NULL, source_type VARCHAR(30) NOT NULL, -- mqtt/kafka/rest/websocket/database/file category VARCHAR(30), -- iot/manual/api/database connection_config JSONB, -- 连接配置(JSON) sync_mode VARCHAR(20) DEFAULT 'realtime', -- realtime/batch/scheduled sync_cron VARCHAR(50), -- 定时同步Cron表达式 status SMALLINT DEFAULT 1, -- 0:禁用 1:启用 description VARCHAR(500), last_sync_at TIMESTAMP, deleted SMALLINT DEFAULT 0, created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW() ); COMMENT ON TABLE de_data_source IS '数据源配置表'; COMMENT ON COLUMN de_data_source.source_type IS '数据源类型: mqtt/kafka/rest/websocket/database/file'; COMMENT ON COLUMN de_data_source.sync_mode IS '同步模式: realtime/batch/scheduled'; CREATE INDEX IF NOT EXISTS idx_de_data_source_type ON de_data_source(source_type); CREATE INDEX IF NOT EXISTS idx_de_data_source_status ON de_data_source(status); -- ==================== 数据采集 ==================== -- 数据采集任务表 CREATE TABLE IF NOT EXISTS de_collect_task ( id BIGSERIAL PRIMARY KEY, task_name VARCHAR(100) NOT NULL, source_id BIGINT REFERENCES de_data_source(id), collect_type VARCHAR(30) NOT NULL, -- realtime/batch/manual topic VARCHAR(100), -- Kafka/MQTT topic target_table VARCHAR(100), -- 目标表名 transform_rule JSONB, -- 转换规则 status VARCHAR(20) DEFAULT 'pending', -- pending/running/paused/completed/failed total_count BIGINT DEFAULT 0, success_count BIGINT DEFAULT 0, fail_count BIGINT DEFAULT 0, start_time TIMESTAMP, end_time TIMESTAMP, error_msg TEXT, deleted SMALLINT DEFAULT 0, created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW() ); COMMENT ON TABLE de_collect_task IS '数据采集任务表'; CREATE INDEX IF NOT EXISTS idx_de_collect_task_status ON de_collect_task(status); CREATE INDEX IF NOT EXISTS idx_de_collect_task_source ON de_collect_task(source_id); -- 数据采集记录表 CREATE TABLE IF NOT EXISTS de_collect_record ( id BIGSERIAL PRIMARY KEY, task_id BIGINT REFERENCES de_collect_task(id), source_id BIGINT REFERENCES de_data_source(id), source_type VARCHAR(30), source_key VARCHAR(100), raw_data JSONB, processed_data JSONB, status VARCHAR(20) DEFAULT 'success', -- success/failed/skipped error_msg VARCHAR(500), collect_time TIMESTAMP DEFAULT NOW() ); COMMENT ON TABLE de_collect_record IS '数据采集记录表'; CREATE INDEX IF NOT EXISTS idx_de_collect_record_time ON de_collect_record(collect_time DESC); CREATE INDEX IF NOT EXISTS idx_de_collect_record_task ON de_collect_record(task_id); -- ==================== 数据接入 ==================== -- API接入配置表 CREATE TABLE IF NOT EXISTS de_api_config ( id BIGSERIAL PRIMARY KEY, api_name VARCHAR(100) NOT NULL, api_path VARCHAR(200) UNIQUE NOT NULL, method VARCHAR(10) DEFAULT 'POST', -- GET/POST/PUT source_id BIGINT REFERENCES de_data_source(id), request_schema JSONB, -- 请求Schema定义 response_schema JSONB, -- 响应Schema定义 auth_type VARCHAR(20) DEFAULT 'none', -- none/token/api_key/basic rate_limit INT DEFAULT 100, -- 限流(次/分钟) status SMALLINT DEFAULT 1, deleted SMALLINT DEFAULT 0, created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW() ); COMMENT ON TABLE de_api_config IS 'API接入配置表'; -- ==================== 数据存储 ==================== -- 存储配置表 CREATE TABLE IF NOT EXISTS de_storage_config ( id BIGSERIAL PRIMARY KEY, storage_name VARCHAR(100) NOT NULL, storage_type VARCHAR(30) NOT NULL, -- tdengine/postgresql/minio connection_url VARCHAR(500), username VARCHAR(100), password VARCHAR(255), database_name VARCHAR(100), bucket_name VARCHAR(100), extra_config JSONB, status SMALLINT DEFAULT 1, deleted SMALLINT DEFAULT 0, created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW() ); COMMENT ON TABLE de_storage_config IS '存储配置表'; -- 存储路由规则表(哪类数据存到哪) CREATE TABLE IF NOT EXISTS de_storage_route ( id BIGSERIAL PRIMARY KEY, source_type VARCHAR(30) NOT NULL, data_category VARCHAR(50), -- telemetry/quality/billing/document storage_id BIGINT REFERENCES de_storage_config(id), target_table VARCHAR(100), partition_rule VARCHAR(200), -- 分区规则 retention_days INT DEFAULT 365, status SMALLINT DEFAULT 1, created_at TIMESTAMP DEFAULT NOW() ); COMMENT ON TABLE de_storage_route IS '存储路由规则表'; -- ==================== 数据集成 ==================== -- 数据同步任务表 CREATE TABLE IF NOT EXISTS de_sync_task ( id BIGSERIAL PRIMARY KEY, task_name VARCHAR(100) NOT NULL, source_id BIGINT REFERENCES de_data_source(id), target_storage_id BIGINT REFERENCES de_storage_config(id), sync_type VARCHAR(30) NOT NULL, -- full/incremental/cdc sync_cron VARCHAR(50), last_sync_at TIMESTAMP, last_sync_count BIGINT, status VARCHAR(20) DEFAULT 'pending', -- pending/running/paused/completed/failed error_msg TEXT, deleted SMALLINT DEFAULT 0, created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW() ); COMMENT ON TABLE de_sync_task IS '数据同步任务表'; CREATE INDEX IF NOT EXISTS idx_de_sync_task_status ON de_sync_task(status); -- ==================== 数据质量 ==================== -- 数据质量规则表 CREATE TABLE IF NOT EXISTS de_quality_rule ( id BIGSERIAL PRIMARY KEY, rule_name VARCHAR(100) NOT NULL, rule_type VARCHAR(30) NOT NULL, -- completeness/validity/timeliness/consistency table_name VARCHAR(100), column_name VARCHAR(100), rule_expr VARCHAR(500), -- 规则表达式 threshold DECIMAL(5,2), -- 阈值 severity VARCHAR(20) DEFAULT 'warning', -- info/warning/error enabled SMALLINT DEFAULT 1, created_at TIMESTAMP DEFAULT NOW(), updated_at TIMESTAMP DEFAULT NOW() ); COMMENT ON TABLE de_quality_rule IS '数据质量规则表'; -- 数据质量检查记录表 CREATE TABLE IF NOT EXISTS de_quality_check ( id BIGSERIAL PRIMARY KEY, rule_id BIGINT REFERENCES de_quality_rule(id), check_time TIMESTAMP DEFAULT NOW(), total_count BIGINT, pass_count BIGINT, fail_count BIGINT, pass_rate DECIMAL(5,2), result_detail JSONB, status VARCHAR(20) DEFAULT 'success' -- success/failed ); COMMENT ON TABLE de_quality_check IS '数据质量检查记录表'; CREATE INDEX IF NOT EXISTS idx_de_quality_check_time ON de_quality_check(check_time DESC); -- ==================== 数据血缘 ==================== -- 数据血缘关系表 CREATE TABLE IF NOT EXISTS de_data_lineage ( id BIGSERIAL PRIMARY KEY, source_table VARCHAR(100) NOT NULL, source_column VARCHAR(100), target_table VARCHAR(100) NOT NULL, target_column VARCHAR(100), transform_type VARCHAR(30), -- direct/mapping/aggregation/calculation transform_rule TEXT, description VARCHAR(500), created_at TIMESTAMP DEFAULT NOW() ); COMMENT ON TABLE de_data_lineage IS '数据血缘关系表'; CREATE INDEX IF NOT EXISTS idx_de_lineage_source ON de_data_lineage(source_table); CREATE INDEX IF NOT EXISTS idx_de_lineage_target ON de_data_lineage(target_table); -- ==================== 数据引擎统计 ==================== -- 数据统计仪表板 CREATE TABLE IF NOT EXISTS de_stat_daily ( id BIGSERIAL PRIMARY KEY, stat_date DATE NOT NULL, source_id BIGINT, collect_count BIGINT DEFAULT 0, store_count BIGINT DEFAULT 0, quality_score DECIMAL(5,2), sync_count BIGINT DEFAULT 0, error_count BIGINT DEFAULT 0, created_at TIMESTAMP DEFAULT NOW(), UNIQUE(stat_date, source_id) ); COMMENT ON TABLE de_stat_daily IS '日统计数据表';