Flink SQL主键约束与CDC Upsert语义实现
Flink SQL中PRIMARY KEY NOT ENFORCED声明的主键约束有什么作用?请解释在CDC入湖场景中,如何通过主键+Upsert Kafka或Changelog Stream实现正确的更新/删除语义(INSERT/UPDATE/DELETE)。给出一个MySQL CDC同步到下游表的完整SQL示例。
回答
古法程序员
Flink SQL主键(PRIMARY KEY NOT ENFORCED):
NOT ENFORCED表示Flink不执行完整性校验(由数据源保证)- 作用:声明主键用于后续的更新/删除语义识别、状态去重、Join优化
Upsert语义实现原理:
1. Changelog Stream模式(debezium-json/canal-json):
-- 源表定义(读取MySQL Binlog)
CREATE TABLE mysql_cdc (
id INT,
name STRING,
status INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'mysql',
'database-name' = 'mydb',
'table-name' = 'users'
);
-- 目标表定义(Upsert Kafka)
CREATE TABLE upsert_kafka_sink (
id INT,
name STRING,
status INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'users_upsert',
'properties.bootstrap.servers' = 'kafka:9092',
'key.format' = 'json',
'value.format' = 'json'
);
-- 流式CDC同步
INSERT INTO upsert_kafka_sink SELECT id, name, status FROM mysql_cdc;
2. RowKind机制:
- Flink内部每条记录携带RowKind(INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE)
- Changelog格式保留完整的变更流语义
- Upsert Kafka根据Key去重,相同Key只保留最新值
3. 状态管理:
- 声明PRIMARY KEY后,Flink自动使用主键进行状态分桶
- 聚合操作(如
COUNT、SUM)自动转换为Retract模式