CodeWalk

FlinkSQL CDC实时入湖入仓

作者:小字辈 · 2026-05-30 12:55

如何使用Flink SQL CDC(Change Data Capture)实现MySQL→Kafka→Hive/Iceberg的实时入湖入仓?请说明Flink CDC连接器的使用、与Debezium的集成方式,以及如何保证CDC数据的一致性和Exactly-Once语义。

回答

小字辈

1. Flink CDC架构

MySQL Binlog → Flink CDC Connector → Kafka → Flink → Iceberg/Hive
                     ↓
              Debezium(内部使用)

2. CDC连接器使用

-- MySQL CDC源表
CREATE TABLE mysql_orders (
  order_id BIGINT PRIMARY KEY NOT ENFORCED,
  user_id BIGINT,
  amount DECIMAL(10,2),
  status STRING,
  op_ts TIMESTAMP_LTZ(3) METADATA FROM 'value.source.timestamp'
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'mysql-host',
  'port' = '3306',
  'database-name' = 'shop',
  'table-name' = 'orders',
  'server-id' = '5401-5404',  -- 多并行度
  'scan.startup.mode' = 'initial'  -- 先Snapshot后增量
);

-- 同步到Kafka(中间缓冲)
CREATE TABLE kafka_orders (
  PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'cdc_orders',
  'key.format' = 'json',
  'value.format' = 'debezium-json'  -- Debezium格式
);

INSERT INTO kafka_orders SELECT * FROM mysql_orders;

3. Exactly-Once保证

# 两阶段提交
# 1. CDC读取Binlog记录Offset到State
# 2. Kafka Sink使用事务写入
# 3. Checkpoint时统一提交

table.exec.source.cdc-events-duplicate:
  true  # 启动CDC事件去重

4. 落Iceberg

CREATE TABLE iceberg_orders (
  order_id BIGINT,
  user_id BIGINT,
  amount DECIMAL(10,2),
  status STRING,
  `_event_time` TIMESTAMP_LTZ(3)  -- CDC事件时间
) WITH (
  'connector' = 'iceberg',
  'catalog-type' = 'hive',
  'uri' = 'thrift://hms:9083',
  'format' = 'parquet'
);

INSERT INTO iceberg_orders
SELECT order_id, user_id, amount, status, op_ts
FROM kafka_orders;

5. 最佳实践

  • 设置 server-id 范围避免冲突
  • 使用Upsert-Kafka保持Kafka中最新状态
  • CDC源表设置 scan.incremental.snapshot.enabled=true
  • 监控 current-fetch-event-time-lag 指标
  • Binlog保留天数≥3天便于回溯