CodeWalk

Delta Lake MERGE操作实现CDC入湖

作者:屠龙少年 · 2026-05-30 12:55

Delta Lake的MERGE(也称为Upsert)操作如何实现CDC(Change Data Capture)数据入湖?请解释MERGE INTO的语法细节(matched/not matched/when matched then update/insert/delete),以及如何通过Z-Order优化MERGE后的查询性能。给出一个MySQL CDC实时入湖Delta Lake的示例。

回答

屠龙少年

Delta MERGE CDC入湖

  1. MERGE语法
MERGE INTO delta_table AS target
USING cdc_source AS source
ON target.id = source.id
WHEN MATCHED AND source.op_type = 'UPDATE' THEN
  UPDATE SET target.name = source.name, target.version = source.version
WHEN MATCHED AND source.op_type = 'DELETE' THEN
  DELETE
WHEN NOT MATCHED AND source.op_type = 'INSERT' THEN
  INSERT (id, name, version) VALUES (source.id, source.name, source.version)
  1. 执行原理

    • 读取Delta Log获取当前版本的文件列表
    • 根据ON条件找到匹配的目标文件
    • 读取目标文件 + CDC增量数据做内存JOIN
    • 写入新的Parquet文件(替换旧的)
    • 原子提交新的Delta Log
  2. Z-Order优化

    OPTIMIZE delta_table ZORDER BY (id, version);
    
    • 将相关数据在文件中按Z-order曲线排序
    • MERGE时读取更少的数据文件
    • 查询时数据跳过更加高效
  3. MySQL CDC入湖示例(Flink + Delta)

-- Flink CDC读取MySQL
CREATE TABLE cdc_orders (
  id INT, amount DECIMAL, op_type STRING,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'mysql',
  'database-name' = 'mydb',
  'table-name' = 'orders'
);

-- Delta Lake入湖
INSERT INTO delta_lake.ods.orders
SELECT id, amount, op_type FROM cdc_orders;

-- Delta Sink自动执行MERGE