消息积压与处理?

2025年 阅读约 15 分钟 面试指南 · 消息中间件

深入解析消息积压:积压原因分析(生产>消费)、紧急处理方案(扩容消费者/临时消费者转储)、批量消费优化、消息过期处理、监控告警体系,附面试模拟问答。

一句话总结

消息积压的本质:生产速度 > 消费速度。原因:消费者处理慢(下游瓶颈)、消费者数量不足、消费者宕机、突发流量。紧急处理:1)扩容消费者(增加消费者实例/线程)、2)临时消费者转储(新起消费者快速消费到另一个 Topic 或数据库)、3)降级处理(跳过非核心逻辑)。预防措施:监控告警(积压数量、消费延迟)、批量消费(一次拉取多条)、异步处理(消费后异步写库)。

初级理解

消息积压的原因

# 积压原因 # 1. 生产速度 > 消费速度(根本原因) # 2. 消费者处理慢 # - 消费逻辑中有慢 SQL # - 消费逻辑中有外部调用(HTTP/RPC) # - 消费逻辑中有复杂计算 # 3. 消费者数量不足 # - 实例数不够 # - 线程数不够 # 4. 消费者宕机 # - 服务挂了,没有消费者 # 5. 突发流量 # - 秒杀/大促,瞬时大量消息

中级深入

紧急处理方案

# 方案1:扩容消费者(最快) # 增加消费者实例数量 # RocketMQ:增加 Consumer Group 中的实例 # Kafka:增加分区数 + 消费者数(分区数 ≥ 消费者数) # 方案2:临时消费者转储(积压严重时) # 写一个临时消费者,不做业务处理,只快速转储 # 原 Topic → 临时消费者 → 新 Topic / 数据库 # 等积压消除后,再慢慢处理转储的数据 # 方案3:降级处理 # 跳过非核心逻辑(如日志、统计) # 只处理核心业务(如扣库存、扣余额) # 方案4:提高消费并行度 # 增加消费线程数 # RocketMQ:setConsumeThreadMax(64) # Kafka:增加分区数 + 消费者线程数

批量消费优化

# RocketMQ 批量消费 @RocketMQMessageListener( topic = "order-topic", consumerGroup = "order-group", consumeMessageBatchMaxSize = 32 // 每次拉取最多 32 条 ) @Component public class OrderConsumer implements RocketMQListener<List<Order>> { @Override public void onMessage(List<Order> orders) { // 批量处理 orderService.batchProcess(orders); } } # Kafka 批量消费 props.put("max.poll.records", 500); // 每次拉取最多 500 条 # 批量处理优化 # 1. 批量写数据库(batch insert) # 2. 批量调外部接口(合并请求) # 3. 减少 IO 次数

高级拓展

消息过期处理

# RocketMQ 消息过期 # 默认消息保存 72 小时(3 天) # 过期后自动删除 # 问题:积压超过 72 小时,消息被删除 # 解决: # 1. 增加消息保存时间 # messageDelayLevel 或自定义 # 2. 监控积压,提前告警 # 3. 消息被删除后,通过其他方式补偿 # - 查数据库补偿 # - 人工处理 # Kafka 消息过期 # log.retention.hours:默认 168 小时(7 天) # 按时间或大小删除

监控告警

# 关键监控指标 # 1. 消息积压数量(Lag) # RocketMQ:consumerOffset - maxOffset # Kafka:consumer lag # 告警阈值:积压 > 10 万 # 2. 消费延迟 # 消息产生时间到消费时间的差值 # 告警阈值:延迟 > 1 分钟 # 3. 消费 TPS # 每秒消费消息数 # 告警阈值:TPS 下降 > 50% # 4. 消费失败率 # 消费失败次数 / 总消费次数 # 告警阈值:失败率 > 1% # Prometheus + Grafana 监控 # RocketMQ Exporter 暴露指标 # Kafka Exporter 暴露指标

实战场景

场景:大促期间消息积压处理

# 大促前预防 # 1. 预估流量,提前扩容消费者 # 2. 压测消费能力,确保 TPS 足够 # 3. 配置监控告警 # 大促中应急 # 1. 发现积压 → 立即扩容消费者(加机器) # 2. 积压严重 → 启动临时消费者转储 # rocketMQTemplate.consumer("order-topic", msg -> { # // 不做业务处理,直接存到新 Topic # rocketMQTemplate.send("order-backup-topic", msg); # }); # 3. 降级非核心逻辑 # if (isPeakTime()) { # // 跳过日志、统计 # processCore(order); # } else { # processFull(order); # } # 大促后恢复 # 1. 处理转储的消息 # 2. 缩容消费者 # 3. 恢复完整处理逻辑

面试模拟

面试官:消息积压了怎么办?

你:先紧急扩容消费者(加机器/加线程),如果还不行就启动临时消费者快速转储到新 Topic,同时降级非核心逻辑。事后分析积压原因(消费慢/突发流量),优化消费性能(批量消费/异步处理)。

面试官:如何预防消息积压?

你:监控告警(积压数量、消费延迟)、批量消费(一次拉取多条)、异步处理(消费后异步写库)、提前压测消费能力、大促前扩容消费者。