一句话总结
消息可靠性保证贯穿生产 → 存储 → 消费全链路:生产端用同步发送+确认机制(RocketMQ 的 SendStatus、Kafka 的 acks=all)、Broker 端用同步刷盘+主从复制(多副本)、消费端用手动 ACK+消费完再确认。消息丢失三种场景:生产丢(异步发送失败未处理)、Broker 丢(异步刷盘宕机)、消费丢(自动 ACK 后处理失败)。消息重复不可避免,必须通过幂等消费(唯一键去重)解决。
初级理解
消息丢失的三种场景
# 场景1:生产端丢失
# 生产者发送消息,网络超时,以为失败重发
# 实际 Broker 已收到,导致消息重复
# 或:异步发送,缓冲区满了,消息丢弃
# 场景2:Broker 端丢失
# Broker 收到消息,先存内存(异步刷盘)
# 还没来得及刷到磁盘,宕机了
# 内存中的消息丢失
# 场景3:消费端丢失
# 消费者拉到消息,自动 ACK
# 然后处理消息时异常,消息已 ACK 无法重试
# 或:消费者 offset 提交了,但消息还没处理完
中级深入
生产端可靠性
# RocketMQ 发送可靠性
# 同步发送(推荐)
SendResult result = producer.send(msg);
if (result.getSendStatus() == SendStatus.SEND_OK) {
// 发送成功
} else {
// 发送失败,重试或记录
}
# Kafka 发送可靠性
# acks=0:不等待确认(可能丢消息)
# acks=1:Leader 确认即可(Leader 宕机可能丢)
# acks=all(或-1):所有副本确认(最可靠)
props.put("acks", "all");
props.put("retries", 3); // 重试次数
# 同步发送 + 失败重试 + 落库兜底
# 发送失败 → 重试 3 次 → 仍失败 → 写入数据库
# 定时任务扫描数据库,补偿发送
Broker 端可靠性
# RocketMQ 持久化
# 同步刷盘:消息写入磁盘后才返回成功
# 可靠性高,性能低(约 1/10)
flushDiskType: SYNC_FLUSH
# 异步刷盘:消息写入内存即返回,定期刷盘
# 性能高,宕机可能丢消息
flushDiskType: ASYNC_FLUSH # 默认
# 主从复制
# 同步复制:Master 等 Slave 确认后才返回
# 异步复制:Master 不等 Slave(默认)
# Kafka 持久化
# 多副本机制:每个分区有多个副本
# min.insync.replicas:最少同步副本数
# acks=all + min.insync.replicas=2 → 至少两个副本确认
高级拓展
消费端可靠性
# RocketMQ 消费确认
# 集群模式(默认)
# 消费成功 → 返回 CONSUME_SUCCESS → 更新 offset
# 消费失败 → 返回 RECONSUME_LATER → 稍后重试
@RocketMQMessageListener(topic = "order-topic", consumerGroup = "order-group")
@Component
public class OrderConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
try {
// 处理业务
processOrder(order);
// 成功,自动返回 CONSUME_SUCCESS
} catch (Exception e) {
// 失败,抛出异常 → 自动重试
throw new RuntimeException("处理失败", e);
}
}
}
# Kafka 消费确认
# 手动提交 offset(推荐)
props.put("enable.auto.commit", "false");
// 处理完消息后手动提交
consumer.commitSync();
幂等消费
# 为什么需要幂等?
# 消息重复不可避免(网络重试、Rebalance)
# 消费者必须保证幂等:同一条消息处理多次结果相同
# 方案1:数据库唯一键
# 用消息 ID 作为唯一键
INSERT INTO order_consume_record (msg_id, order_id) VALUES (?, ?);
# 重复消息 → 唯一键冲突 → 跳过
# 方案2:Redis 去重
String msgId = order.getMsgId();
Boolean success = redisTemplate.opsForValue()
.setIfAbsent("msg:" + msgId, "1", 24, TimeUnit.HOURS);
if (Boolean.TRUE.equals(success)) {
// 第一次处理
processOrder(order);
} else {
// 重复消息,跳过
}
# 方案3:业务状态判断
# 订单已支付 → 不再处理支付消息
if (order.getStatus() == OrderStatus.PAID) {
return; // 已处理,跳过
}
实战场景
场景:可靠消息完整方案
# 生产端:同步发送 + 失败重试 + 落库兜底
public void sendReliable(Message msg) {
for (int i = 0; i < 3; i++) {
try {
SendResult result = producer.send(msg);
if (result.getSendStatus() == SendStatus.SEND_OK) {
return; // 成功
}
} catch (Exception e) {
log.error("发送失败,重试 {}/3", i + 1, e);
}
}
// 3 次重试都失败,落库
saveToDb(msg);
}
# Broker 端:同步刷盘 + 主从同步复制
# 消费端:手动 ACK + 幂等处理
# 监控:消息积压告警、消费延迟告警
面试模拟
面试官:如何保证消息不丢失?
你:全链路保证:生产端同步发送+失败重试+落库兜底,Broker 端同步刷盘+多副本,消费端手动 ACK+消费完再确认。三者缺一不可。
面试官:如何保证消息不重复消费?
你:消息重复不可避免,只能通过幂等消费解决。三种方案:数据库唯一键(消息 ID 去重)、Redis setnx(分布式去重)、业务状态判断(已处理则跳过)。