CodeWalk

FlinkSQL实时数仓分层实现

作者:我还是少年 · 2026-05-30 12:55

如何使用Flink SQL实现实时数仓的分层架构(ODS→DWD→DWS→ADS)?请说明每层的作用、使用Flink SQL的建表语句示例、以及数据在层间的流转逻辑。如何解决Flink SQL实时数仓中的数据回溯和历史数据修复问题?

回答

我还是少年

1. 分层架构设计

ODS(操作数据层)→ DWD(明细层)→ DWS(汇总层)→ ADS(应用层)
    ↓ Kafka              ↓ Kafka              ↓ Kafka/MySQL
  原始日志 + CDC       清洗后明细           宽表/聚合

2. 各层实现

ODS层:入湖原始数据

CREATE TABLE ods_order (
  order_id BIGINT,
  user_id BIGINT,
  amount DECIMAL(10,2),
  `timestamp` TIMESTAMP(3) METADATA FROM 'timestamp',
  WATERMARK FOR `timestamp` AS `timestamp` - INTERVAL '5' SECOND,
  PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'connector' = 'kafka',
  'topic' = 'ods_orders',
  'format' = 'json'
);

DWD层:数据清洗与维度关联

INSERT INTO dwd_order_detail
SELECT 
  o.order_id,
  o.user_id,
  u.user_name,
  o.amount,
  o.`timestamp`
FROM ods_order o
LEFT JOIN dim_user FOR SYSTEM_TIME AS OF o.proc_time AS u
ON o.user_id = u.user_id;

DWS层:轻度汇总

CREATE TABLE dws_order_daily (
  event_date STRING,
  total_amount DECIMAL(20,2),
  order_count BIGINT,
  PRIMARY KEY (event_date) NOT ENFORCED
) WITH ('connector' = 'upsert-kafka', ...);

INSERT INTO dws_order_daily
SELECT 
  DATE_FORMAT(`timestamp`, 'yyyy-MM-dd'),
  SUM(amount),
  COUNT(DISTINCT order_id)
FROM dwd_order_detail
GROUP BY DATE_FORMAT(`timestamp`, 'yyyy-MM-dd');

ADS层:输出到MySQL/ClickHouse

INSERT INTO ads_order_report
SELECT * FROM dws_order_daily;

3. 数据回溯与修复

-- 方案1:Kafka消息重放(重置Offset)
ALTER TABLE ods_order RESET ('scan.startup.mode' = 'earliest-offset');

-- 方案2:批处理修复(使用批模式读Kafka)
SET 'execution.runtime-mode' = 'batch';

-- 方案3:Hive/Bucket回溯
INSERT INTO dwd_order_detail
SELECT /*+ OPTIONS('scan.startup.mode'='timestamp',
       'scan.startup.timestamp-millis'='1704067200000') */ ...;

4. 注意事项

  • DWS层使用Upsert-Kafka避免重复
  • 状态保留时间根据回溯需求设置
  • 使用Savepoint支持作业升级