CodeWalk

Flink SQL DML中的INSERT/UPDATE/DELETE/MERGE操作与幂等性

作者:孤独的心 · 2026-05-30 12:55

Flink SQL中DML操作(INSERT INTO/INSERT OVERWRITE/CTAS/CREATE TABLE LIKE/ALTER TABLE)与传统数据库有何不同?请介绍Flink 1.17+支持的MERGE INTO语法及其在CDC入湖中的用法,以及如何通过幂等写入(Idempotent Write)保证端到端Exactly-Once语义。

回答

孤独的心

Flink SQL DML操作详解:

1. INSERT INTO

INSERT INTO target_table SELECT ... FROM source_stream;
  • 默认追加写入(Append)
  • 若目标表有PK + upsert连接器,自动转为Upsert

2. INSERT OVERWRITE(仅批模式):

INSERT OVERWRIDE dws.daily_stats
PARTITION (dt = '2025-05-25')
SELECT ... FROM dwd.fact_orders;
  • 分区级别覆盖,删除旧分区数据再写入

3. CREATE TABLE AS SELECT(CTAS)

CREATE TABLE daily_sales
WITH ('connector' = 'filesystem')
AS SELECT user_id, SUM(amount) AS total
FROM orders GROUP BY user_id;
  • 自动建表+写入

4. MERGE INTO(Flink 1.17+)

MERGE INTO target AS T
USING source AS S
ON T.id = S.id
WHEN MATCHED THEN UPDATE SET T.name = S.name
WHEN NOT MATCHED THEN INSERT (id, name) VALUES (S.id, S.name);
  • 目前支持在批模式或有限流中执行

5. 幂等写入保证Exactly-Once

  • 使用两阶段提交Sink(Kafka/HBase/JDBC)
  • 每个Batch/Bucket写入时携带事务ID批次ID
  • 失败重试时,Sink根据事务ID自动去重
  • 示例:'sink.buffer-flush.max-rows' = '1000' + Flink Checkpoint机制