Flink SQL Temporal Join(时态表Join)实现拉链表关联
Flink SQL中的Temporal Join(时态表Join)如何实现事实表关联缓慢变化维(SCD Type 2)?请解释FOR SYSTEM_TIME AS OF语法的工作原理、版本表(Versioned Table)和普通视图的区别、以及Temporal Join在电商订单关联历史价格的典型应用。
回答
专业代码师
Temporal Join(时态表Join)原理:
1. 概念:将事实流中的每条记录,关联到维度表在该记录事件时间时的最新有效版本。
2. 语法:
-- 定义版本维表(包含时间区间)
CREATE TABLE product_price (
product_id INT,
price DECIMAL(10,2),
currency STRING,
update_time TIMESTAMP(3),
PRIMARY KEY (product_id) NOT ENFORCED,
WATERMARK FOR update_time AS update_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'mysql-cdc',
'table-name' = 'product_price_history'
);
-- Temporal Join查询
SELECT
o.order_id,
o.product_id,
o.order_time,
p.price,
o.quantity * p.price AS total_amount
FROM orders AS o
LEFT JOIN product_price FOR SYSTEM_TIME AS OF o.order_time AS p
ON o.product_id = p.product_id;
3. 版本表(Versioned Table)条件:
- 必须声明PRIMARY KEY
- 必须有事件时间属性(WATERMARK)
- 底层存储需支持changelog流(CDC/Upsert Kafka)
4. 应用场景:
- 电商:订单关联历史价格(下单时的价格)
- 金融:交易关联当时汇率
- 用户:订单关联下单时的用户等级
5. 与Lookup Join的区别: | 维度 | Temporal Join | Lookup Join | |------|--------------|-------------| | 数据来源 | 流式CDC维表 | 外部存储(MySQL/HBase/Redis)| | 版本管理 | 支持时间版本 | 只返回当前最新值 | | 触发方式 | 事件时间对齐 | 每行处理时间查询 | | 状态 | Flink State存储 | 外部存储查询 |