CodeWalk

Flink Table API与SQL优化技巧

作者:编译有声 · 2026-05-30 12:55

在Flink中使用Table API和SQL时,有哪些常见的性能优化技巧?请从MiniBatch聚合、Local-Global聚合、Split Distinct、Top-N优化、以及Join优化等角度说明如何优化Flink SQL作业的性能。

回答

编译有声

1. MiniBatch聚合

# 缓冲多个输入再聚合,减少状态访问
# 适用于高吞吐聚合场景
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 5s
table.exec.mini-batch.size: 5000

原理

普通聚合:每条数据更新一次状态
MiniBatch:缓冲5秒或5000条数据,批量更新状态
  状态读写减少90%+

2. Local-Global聚合

# 两阶段聚合:Local预聚合 + Global最终聚合
# 解决数据倾斜
table.optimizer.agg-phase-strategy: TWO_PHASE

# 自动启用(默认开启)

原理

Local阶段:每个SubTask本地预聚合
  key=1, count=100
  key=2, count=200
↓ Shuffle
Global阶段:汇总所有Local结果
  key=1, count=10,000

3. Split Distinct(去重优化)

# 热点去重Key拆分+合并
table.optimizer.distinct-agg.split.enabled: true

# 原理:COUNT(DISTINCT user_id) 
# → COUNT(DISTINCT user_id_0) + ... + COUNT(DISTINCT user_id_N)

4. Top-N优化

-- 使用ROW_NUMBER避免全量排序
SELECT * FROM (
  SELECT *, ROW_NUMBER() OVER (
    PARTITION BY category ORDER BY sales DESC
  ) AS rn
  FROM orders
) WHERE rn <= 100;

-- 启用Top-N优化
table.optimizer.topn-max-retain-threshold: 100

5. Join优化

-- 使用MiniBatch Join
SET 'table.exec.mini-batch.enabled' = 'true';

-- Lookup Join优化
CREATE TABLE dim_table (
  ... PRIMARY KEY (id) NOT ENFORCED
) WITH (
  'connector' = 'jdbc',
  'lookup.cache.max-rows' = '10000',
  'lookup.cache.ttl' = '10min'
);

6. 状态优化

table.exec.state.ttl: 3600000  # 状态TTL=1小时
# 默认状态永久保留,设置TTL减少状态大小

7. 配置优化组合

# 高吞吐聚合场景推荐
pipeline.name: HighPerfAgg

# Table配置
table.exec.mini-batch.enabled: true
table.exec.mini-batch.allow-latency: 5s
table.exec.mini-batch.size: 5000
table.optimizer.agg-phase-strategy: TWO_PHASE
table.optimizer.distinct-agg.split.enabled: true
table.exec.state.ttl: 3600000