FlinkSQL实时数仓分层实现
如何使用Flink SQL实现实时数仓的分层架构(ODS→DWD→DWS→ADS)?请说明每层的作用、使用Flink SQL的建表语句示例、以及数据在层间的流转逻辑。如何解决Flink SQL实时数仓中的数据回溯和历史数据修复问题?
回答
我还是少年
1. 分层架构设计
ODS(操作数据层)→ DWD(明细层)→ DWS(汇总层)→ ADS(应用层)
↓ Kafka ↓ Kafka ↓ Kafka/MySQL
原始日志 + CDC 清洗后明细 宽表/聚合
2. 各层实现
ODS层:入湖原始数据
CREATE TABLE ods_order (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10,2),
`timestamp` TIMESTAMP(3) METADATA FROM 'timestamp',
WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '5' SECOND,
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'kafka',
'topic' = 'ods_orders',
'format' = 'json'
);
DWD层:数据清洗与维度关联
INSERT INTO dwd_order_detail
SELECT
o.order_id,
o.user_id,
u.user_name,
o.amount,
o.`timestamp`
FROM ods_order o
LEFT JOIN dim_user FOR SYSTEM_TIME AS OF o.proc_time AS u
ON o.user_id = u.user_id;
DWS层:轻度汇总
CREATE TABLE dws_order_daily (
event_date STRING,
total_amount DECIMAL(20,2),
order_count BIGINT,
PRIMARY KEY (event_date) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka', ...);
INSERT INTO dws_order_daily
SELECT
DATE_FORMAT(`timestamp`, 'yyyy-MM-dd'),
SUM(amount),
COUNT(DISTINCT order_id)
FROM dwd_order_detail
GROUP BY DATE_FORMAT(`timestamp`, 'yyyy-MM-dd');
ADS层:输出到MySQL/ClickHouse
INSERT INTO ads_order_report
SELECT * FROM dws_order_daily;
3. 数据回溯与修复
-- 方案1:Kafka消息重放(重置Offset)
ALTER TABLE ods_order RESET ('scan.startup.mode' = 'earliest-offset');
-- 方案2:批处理修复(使用批模式读Kafka)
SET 'execution.runtime-mode' = 'batch';
-- 方案3:Hive/Bucket回溯
INSERT INTO dwd_order_detail
SELECT /*+ OPTIONS('scan.startup.mode'='timestamp',
'scan.startup.timestamp-millis'='1704067200000') */ ...;
4. 注意事项
- DWS层使用Upsert-Kafka避免重复
- 状态保留时间根据回溯需求设置
- 使用Savepoint支持作业升级