CodeWalk

Flink SQL主键约束与CDC Upsert语义实现

作者:古法程序员 · 2026-05-30 12:55

Flink SQL中PRIMARY KEY NOT ENFORCED声明的主键约束有什么作用?请解释在CDC入湖场景中,如何通过主键+Upsert Kafka或Changelog Stream实现正确的更新/删除语义(INSERT/UPDATE/DELETE)。给出一个MySQL CDC同步到下游表的完整SQL示例。

回答

古法程序员

Flink SQL主键(PRIMARY KEY NOT ENFORCED):

  • NOT ENFORCED表示Flink不执行完整性校验(由数据源保证)
  • 作用:声明主键用于后续的更新/删除语义识别、状态去重、Join优化

Upsert语义实现原理:

1. Changelog Stream模式(debezium-json/canal-json)

-- 源表定义(读取MySQL Binlog)
CREATE TABLE mysql_cdc (
  id INT,
  name STRING,
  status INT,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'mysql-cdc',
  'hostname' = 'mysql',
  'database-name' = 'mydb',
  'table-name' = 'users'
);

-- 目标表定义(Upsert Kafka)
CREATE TABLE upsert_kafka_sink (
  id INT,
  name STRING,
  status INT,
  PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'upsert-kafka',
  'topic' = 'users_upsert',
  'properties.bootstrap.servers' = 'kafka:9092',
  'key.format' = 'json',
  'value.format' = 'json'
);

-- 流式CDC同步
INSERT INTO upsert_kafka_sink SELECT id, name, status FROM mysql_cdc;

2. RowKind机制

  • Flink内部每条记录携带RowKind(INSERT/UPDATE_BEFORE/UPDATE_AFTER/DELETE)
  • Changelog格式保留完整的变更流语义
  • Upsert Kafka根据Key去重,相同Key只保留最新值

3. 状态管理

  • 声明PRIMARY KEY后,Flink自动使用主键进行状态分桶
  • 聚合操作(如COUNTSUM)自动转换为Retract模式