FlinkSQL CDC实时入湖入仓
如何使用Flink SQL CDC(Change Data Capture)实现MySQL→Kafka→Hive/Iceberg的实时入湖入仓?请说明Flink CDC连接器的使用、与Debezium的集成方式,以及如何保证CDC数据的一致性和Exactly-Once语义。
回答
小字辈
1. Flink CDC架构
MySQL Binlog → Flink CDC Connector → Kafka → Flink → Iceberg/Hive
↓
Debezium(内部使用)
2. CDC连接器使用
-- MySQL CDC源表
CREATE TABLE mysql_orders (
order_id BIGINT PRIMARY KEY NOT ENFORCED,
user_id BIGINT,
amount DECIMAL(10,2),
status STRING,
op_ts TIMESTAMP_LTZ(3) METADATA FROM 'value.source.timestamp'
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql-host',
'port' = '3306',
'database-name' = 'shop',
'table-name' = 'orders',
'server-id' = '5401-5404', -- 多并行度
'scan.startup.mode' = 'initial' -- 先Snapshot后增量
);
-- 同步到Kafka(中间缓冲)
CREATE TABLE kafka_orders (
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'cdc_orders',
'key.format' = 'json',
'value.format' = 'debezium-json' -- Debezium格式
);
INSERT INTO kafka_orders SELECT * FROM mysql_orders;
3. Exactly-Once保证
# 两阶段提交
# 1. CDC读取Binlog记录Offset到State
# 2. Kafka Sink使用事务写入
# 3. Checkpoint时统一提交
table.exec.source.cdc-events-duplicate:
true # 启动CDC事件去重
4. 落Iceberg
CREATE TABLE iceberg_orders (
order_id BIGINT,
user_id BIGINT,
amount DECIMAL(10,2),
status STRING,
`_event_time` TIMESTAMP_LTZ(3) -- CDC事件时间
) WITH (
'connector' = 'iceberg',
'catalog-type' = 'hive',
'uri' = 'thrift://hms:9083',
'format' = 'parquet'
);
INSERT INTO iceberg_orders
SELECT order_id, user_id, amount, status, op_ts
FROM kafka_orders;
5. 最佳实践
- 设置
server-id范围避免冲突 - 使用Upsert-Kafka保持Kafka中最新状态
- CDC源表设置
scan.incremental.snapshot.enabled=true - 监控
current-fetch-event-time-lag指标 - Binlog保留天数≥3天便于回溯