Flink Table API与SQL优化技巧
在Flink中使用Table API和SQL时,有哪些常见的性能优化技巧?请从MiniBatch聚合、Local-Global聚合、Split Distinct、Top-N优化、以及Join优化等角度说明如何优化Flink SQL作业的性能。
回答
编译有声
1. MiniBatch聚合
# 缓冲多个输入再聚合,减少状态访问
# 适用于高吞吐聚合场景
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 5s
table.exec.mini-batch.size: 5000
原理:
普通聚合:每条数据更新一次状态
MiniBatch:缓冲5秒或5000条数据,批量更新状态
状态读写减少90%+
2. Local-Global聚合
# 两阶段聚合:Local预聚合 + Global最终聚合
# 解决数据倾斜
table.optimizer.agg-phase-strategy: TWO_PHASE
# 自动启用(默认开启)
原理:
Local阶段:每个SubTask本地预聚合
key=1, count=100
key=2, count=200
↓ Shuffle
Global阶段:汇总所有Local结果
key=1, count=10,000
3. Split Distinct(去重优化)
# 热点去重Key拆分+合并
table.optimizer.distinct-agg.split.enabled: true
# 原理:COUNT(DISTINCT user_id)
# → COUNT(DISTINCT user_id_0) + ... + COUNT(DISTINCT user_id_N)
4. Top-N优化
-- 使用ROW_NUMBER避免全量排序
SELECT * FROM (
SELECT *, ROW_NUMBER() OVER (
PARTITION BY category ORDER BY sales DESC
) AS rn
FROM orders
) WHERE rn <= 100;
-- 启用Top-N优化
table.optimizer.topn-max-retain-threshold: 100
5. Join优化
-- 使用MiniBatch Join
SET 'table.exec.mini-batch.enabled' = 'true';
-- Lookup Join优化
CREATE TABLE dim_table (
... PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'lookup.cache.max-rows' = '10000',
'lookup.cache.ttl' = '10min'
);
6. 状态优化
table.exec.state.ttl: 3600000 # 状态TTL=1小时
# 默认状态永久保留,设置TTL减少状态大小
7. 配置优化组合
# 高吞吐聚合场景推荐
pipeline.name: HighPerfAgg
# Table配置
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 5s
table.exec.mini-batch.size: 5000
table.optimizer.agg-phase-strategy: TWO_PHASE
table.optimizer.distinct-agg.split.enabled: true
table.exec.state.ttl: 3600000