Flink SQL DML中的INSERT/UPDATE/DELETE/MERGE操作与幂等性
Flink SQL中DML操作(INSERT INTO/INSERT OVERWRITE/CTAS/CREATE TABLE LIKE/ALTER TABLE)与传统数据库有何不同?请介绍Flink 1.17+支持的MERGE INTO语法及其在CDC入湖中的用法,以及如何通过幂等写入(Idempotent Write)保证端到端Exactly-Once语义。
回答
孤独的心
Flink SQL DML操作详解:
1. INSERT INTO:
INSERT INTO target_table SELECT ... FROM source_stream;
- 默认追加写入(Append)
- 若目标表有PK + upsert连接器,自动转为Upsert
2. INSERT OVERWRITE(仅批模式):
INSERT OVERWRIDE dws.daily_stats
PARTITION (dt = '2025-05-25')
SELECT ... FROM dwd.fact_orders;
- 分区级别覆盖,删除旧分区数据再写入
3. CREATE TABLE AS SELECT(CTAS):
CREATE TABLE daily_sales
WITH ('connector' = 'filesystem')
AS SELECT user_id, SUM(amount) AS total
FROM orders GROUP BY user_id;
- 自动建表+写入
4. MERGE INTO(Flink 1.17+):
MERGE INTO target AS T
USING source AS S
ON T.id = S.id
WHEN MATCHED THEN UPDATE SET T.name = S.name
WHEN NOT MATCHED THEN INSERT (id, name) VALUES (S.id, S.name);
- 目前支持在批模式或有限流中执行
5. 幂等写入保证Exactly-Once:
- 使用两阶段提交Sink(Kafka/HBase/JDBC)
- 每个Batch/Bucket写入时携带事务ID或批次ID
- 失败重试时,Sink根据事务ID自动去重
- 示例:
'sink.buffer-flush.max-rows' = '1000'+ Flink Checkpoint机制