Delta Lake MERGE操作实现CDC入湖
Delta Lake的MERGE(也称为Upsert)操作如何实现CDC(Change Data Capture)数据入湖?请解释MERGE INTO的语法细节(matched/not matched/when matched then update/insert/delete),以及如何通过Z-Order优化MERGE后的查询性能。给出一个MySQL CDC实时入湖Delta Lake的示例。
回答
屠龙少年
Delta MERGE CDC入湖:
- MERGE语法:
MERGE INTO delta_table AS target
USING cdc_source AS source
ON target.id = source.id
WHEN MATCHED AND source.op_type = 'UPDATE' THEN
UPDATE SET target.name = source.name, target.version = source.version
WHEN MATCHED AND source.op_type = 'DELETE' THEN
DELETE
WHEN NOT MATCHED AND source.op_type = 'INSERT' THEN
INSERT (id, name, version) VALUES (source.id, source.name, source.version)
-
执行原理:
- 读取Delta Log获取当前版本的文件列表
- 根据ON条件找到匹配的目标文件
- 读取目标文件 + CDC增量数据做内存JOIN
- 写入新的Parquet文件(替换旧的)
- 原子提交新的Delta Log
-
Z-Order优化:
OPTIMIZE delta_table ZORDER BY (id, version);- 将相关数据在文件中按Z-order曲线排序
- MERGE时读取更少的数据文件
- 查询时数据跳过更加高效
-
MySQL CDC入湖示例(Flink + Delta):
-- Flink CDC读取MySQL
CREATE TABLE cdc_orders (
id INT, amount DECIMAL, op_type STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql',
'database-name' = 'mydb',
'table-name' = 'orders'
);
-- Delta Lake入湖
INSERT INTO delta_lake.ods.orders
SELECT id, amount, op_type FROM cdc_orders;
-- Delta Sink自动执行MERGE