一句话总结
消息积压的本质:生产速度 > 消费速度。原因:消费者处理慢(下游瓶颈)、消费者数量不足、消费者宕机、突发流量。紧急处理:1)扩容消费者(增加消费者实例/线程)、2)临时消费者转储(新起消费者快速消费到另一个 Topic 或数据库)、3)降级处理(跳过非核心逻辑)。预防措施:监控告警(积压数量、消费延迟)、批量消费(一次拉取多条)、异步处理(消费后异步写库)。
初级理解
消息积压的原因
# 积压原因
# 1. 生产速度 > 消费速度(根本原因)
# 2. 消费者处理慢
# - 消费逻辑中有慢 SQL
# - 消费逻辑中有外部调用(HTTP/RPC)
# - 消费逻辑中有复杂计算
# 3. 消费者数量不足
# - 实例数不够
# - 线程数不够
# 4. 消费者宕机
# - 服务挂了,没有消费者
# 5. 突发流量
# - 秒杀/大促,瞬时大量消息
中级深入
紧急处理方案
# 方案1:扩容消费者(最快)
# 增加消费者实例数量
# RocketMQ:增加 Consumer Group 中的实例
# Kafka:增加分区数 + 消费者数(分区数 ≥ 消费者数)
# 方案2:临时消费者转储(积压严重时)
# 写一个临时消费者,不做业务处理,只快速转储
# 原 Topic → 临时消费者 → 新 Topic / 数据库
# 等积压消除后,再慢慢处理转储的数据
# 方案3:降级处理
# 跳过非核心逻辑(如日志、统计)
# 只处理核心业务(如扣库存、扣余额)
# 方案4:提高消费并行度
# 增加消费线程数
# RocketMQ:setConsumeThreadMax(64)
# Kafka:增加分区数 + 消费者线程数
批量消费优化
# RocketMQ 批量消费
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-group",
consumeMessageBatchMaxSize = 32 // 每次拉取最多 32 条
)
@Component
public class OrderConsumer implements RocketMQListener<List<Order>> {
@Override
public void onMessage(List<Order> orders) {
// 批量处理
orderService.batchProcess(orders);
}
}
# Kafka 批量消费
props.put("max.poll.records", 500); // 每次拉取最多 500 条
# 批量处理优化
# 1. 批量写数据库(batch insert)
# 2. 批量调外部接口(合并请求)
# 3. 减少 IO 次数
高级拓展
消息过期处理
# RocketMQ 消息过期
# 默认消息保存 72 小时(3 天)
# 过期后自动删除
# 问题:积压超过 72 小时,消息被删除
# 解决:
# 1. 增加消息保存时间
# messageDelayLevel 或自定义
# 2. 监控积压,提前告警
# 3. 消息被删除后,通过其他方式补偿
# - 查数据库补偿
# - 人工处理
# Kafka 消息过期
# log.retention.hours:默认 168 小时(7 天)
# 按时间或大小删除
监控告警
# 关键监控指标
# 1. 消息积压数量(Lag)
# RocketMQ:consumerOffset - maxOffset
# Kafka:consumer lag
# 告警阈值:积压 > 10 万
# 2. 消费延迟
# 消息产生时间到消费时间的差值
# 告警阈值:延迟 > 1 分钟
# 3. 消费 TPS
# 每秒消费消息数
# 告警阈值:TPS 下降 > 50%
# 4. 消费失败率
# 消费失败次数 / 总消费次数
# 告警阈值:失败率 > 1%
# Prometheus + Grafana 监控
# RocketMQ Exporter 暴露指标
# Kafka Exporter 暴露指标
实战场景
场景:大促期间消息积压处理
# 大促前预防
# 1. 预估流量,提前扩容消费者
# 2. 压测消费能力,确保 TPS 足够
# 3. 配置监控告警
# 大促中应急
# 1. 发现积压 → 立即扩容消费者(加机器)
# 2. 积压严重 → 启动临时消费者转储
# rocketMQTemplate.consumer("order-topic", msg -> {
# // 不做业务处理,直接存到新 Topic
# rocketMQTemplate.send("order-backup-topic", msg);
# });
# 3. 降级非核心逻辑
# if (isPeakTime()) {
# // 跳过日志、统计
# processCore(order);
# } else {
# processFull(order);
# }
# 大促后恢复
# 1. 处理转储的消息
# 2. 缩容消费者
# 3. 恢复完整处理逻辑
面试模拟
面试官:消息积压了怎么办?
你:先紧急扩容消费者(加机器/加线程),如果还不行就启动临时消费者快速转储到新 Topic,同时降级非核心逻辑。事后分析积压原因(消费慢/突发流量),优化消费性能(批量消费/异步处理)。
面试官:如何预防消息积压?
你:监控告警(积压数量、消费延迟)、批量消费(一次拉取多条)、异步处理(消费后异步写库)、提前压测消费能力、大促前扩容消费者。