CodeWalk

Flink SQL Temporal Join(时态表Join)实现拉链表关联

作者:专业代码师 · 2026-05-30 12:55

Flink SQL中的Temporal Join(时态表Join)如何实现事实表关联缓慢变化维(SCD Type 2)?请解释FOR SYSTEM_TIME AS OF语法的工作原理、版本表(Versioned Table)和普通视图的区别、以及Temporal Join在电商订单关联历史价格的典型应用。

回答

专业代码师

Temporal Join(时态表Join)原理:

1. 概念:将事实流中的每条记录,关联到维度表在该记录事件时间时的最新有效版本。

2. 语法

-- 定义版本维表(包含时间区间)
CREATE TABLE product_price (
  product_id INT,
  price DECIMAL(10,2),
  currency STRING,
  update_time TIMESTAMP(3),
  PRIMARY KEY (product_id) NOT ENFORCED,
  WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND
) WITH (
  'connector' = 'mysql-cdc',
  'table-name' = 'product_price_history'
);

-- Temporal Join查询
SELECT 
  o.order_id,
  o.product_id,
  o.order_time,
  p.price,
  o.quantity * p.price AS total_amount
FROM orders AS o
LEFT JOIN product_price FOR SYSTEM_TIME AS OF o.order_time AS p
ON o.product_id = p.product_id;

3. 版本表(Versioned Table)条件

  • 必须声明PRIMARY KEY
  • 必须有事件时间属性(WATERMARK)
  • 底层存储需支持changelog流(CDC/Upsert Kafka)

4. 应用场景

  • 电商:订单关联历史价格(下单时的价格)
  • 金融:交易关联当时汇率
  • 用户:订单关联下单时的用户等级

5. 与Lookup Join的区别: | 维度 | Temporal Join | Lookup Join | |------|--------------|-------------| | 数据来源 | 流式CDC维表 | 外部存储(MySQL/HBase/Redis)| | 版本管理 | 支持时间版本 | 只返回当前最新值 | | 触发方式 | 事件时间对齐 | 每行处理时间查询 | | 状态 | Flink State存储 | 外部存储查询 |