Kafka消息积压排查与解决方案
线上Kafka出现消息积压(Consumer Lag)如何排查原因和解决?常见导致消息积压的原因有哪些?如何扩容消费者提升消费能力?如果消费者下游处理慢(如DB写入慢)怎么办?
回答
屠龙少年
排查消息积压:
-
监控Consumer Lag:
# 查看消费者组积压情况 kafka-consumer-groups --bootstrap-server localhost:9092 \ --group my-group --describe # 输出:TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG -
查看消费者日志:确认消费者是否正常拉取消息
-
查看下游组件:DB、Redis等是否出现慢查询
常见原因:
- 消费者处理速度 < 生产者生产速度
- 消费者宕机或Rebalance频繁
- 下游(DB/Redis)写入慢或连接池满
- 单个消息处理时间过长(如大消息反序列化耗时)
- 消费者数量 > 分区数(多余消费者闲置)
解决方案:
方案1:增加分区和消费者
- 扩容Topic的分区数(需选择合适的分区策略)
- 增加消费者数量(消费者 ≤ 分区数)
- 注意:分区只能增加不能减少
方案2:优化消费逻辑
// 批量拉取 + 批量处理(默认500条/次)
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
// 批量写入DB(批量INSERT/批处理)
// 使用多线程消费(注意顺序性和幂等性)
方案3:异步处理 + 缓冲
- 消费者快速拉取消息写入本地队列/内存
- 后台线程批量处理(如批量写入DB)
- 注意:需处理消费者宕机导致的内存中未处理消息丢失
方案4:下游加速
- DB加索引、批量写入、连接池优化
- 使用消息压缩(减小消息体积)
方案5:限流生产者
- 临时限制生产者速度(降级或限流)
- 等待积压消费完再恢复
紧急止损:新建Topic + 跳过部分积压消息(丢失数据),通常作为最后手段。