一句话总结
RocketMQ 架构四大组件:NameServer(路由中心,无状态,类似注册中心)、Broker(消息存储,主从架构)、Producer(生产者)、Consumer(消费者)。存储核心:CommitLog(所有消息顺序写入一个文件,顺序写性能极高)+ ConsumeQueue(按 Topic+Queue 建立索引,指向 CommitLog 偏移量)+ IndexFile(按 Key 查询消息)。两大特色:事务消息(半消息+回查,解决分布式事务)、延时消息(18 个级别,如 1s/5s/10s/30s/1m...)。
初级理解
RocketMQ 架构
# 四大组件
# NameServer:路由中心
# - 无状态,可集群部署
# - 每个 Broker 启动时注册到所有 NameServer
# - Producer/Consumer 从 NameServer 获取路由信息
# Broker:消息存储
# - 主从架构(Master-Slave)
# - Master 负责读写,Slave 负责备份
# - 支持同步/异步复制
# Producer:生产者
# - 从 NameServer 获取 Broker 信息
# - 选择 MessageQueue 发送消息
# Consumer:消费者
# - 从 NameServer 获取 Broker 信息
# - 从 Broker 拉取消息(Pull 模式)
中级深入
存储架构
# CommitLog:消息存储文件
# - 所有 Topic 的消息都写入同一个 CommitLog
# - 顺序写入,性能极高(接近磁盘顺序写极限)
# - 每个文件 1GB,满了新建
# ConsumeQueue:消费队列(索引)
# - 按 Topic + Queue 维度建立索引
# - 存储:CommitLog Offset + Size + Tag HashCode
# - 消费者先读 ConsumeQueue,再读 CommitLog
# - 相当于 CommitLog 的"目录"
# IndexFile:索引文件
# - 按 Key 查询消息
# - 哈希索引结构
# - 支持按 Message Key 快速查找
# 存储流程
# 消息 → CommitLog(顺序写)→ 异步构建 ConsumeQueue → 异步构建 IndexFile
NameServer 原理
# NameServer 特点
# 1. 无状态:节点之间不通信
# 2. 最终一致:Broker 定时上报(30s)
# 3. 心跳检测:Broker 120s 无心跳则剔除
# 为什么不用 Zookeeper?
# 1. NameServer 更轻量,部署简单
# 2. 无状态,不需要选举
# 3. CAP 理论:NameServer 选择 AP(可用性优先)
# Zookeeper 选择 CP(一致性优先)
# 路由发现流程
# Producer → NameServer → 获取 Topic 路由信息
# → 包含:Broker 地址、Queue 数量、读写权限
# → 缓存到本地,定时更新(30s)
高级拓展
事务消息
# 事务消息:解决分布式事务(如订单+扣库存)
# 流程:
# 1. 发送半消息(Half Message)
# - 消息发送到 Broker,但对消费者不可见
# 2. 执行本地事务
# - 如:创建订单
# 3. 根据本地事务结果,提交或回滚半消息
# - 成功 → 提交(消息对消费者可见)
# - 失败 → 回滚(消息删除)
# 4. 如果 Broker 没收到提交/回滚(超时)
# - 回查本地事务状态
# - 根据回查结果决定提交或回滚
# 代码示例
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"tx-producer-group", "order-topic",
MessageBuilder.withPayload(order).build(),
order.getId() // 事务参数
);
# 事务监听器
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
orderService.createOrder((Long) arg);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 回查本地事务状态
Long orderId = (Long) msg.getHeaders().get("orderId");
Order order = orderService.getById(orderId);
return order != null ? COMMIT : ROLLBACK;
}
}
延时消息
# 延时消息:消息发送后,延迟一段时间才投递
# 18 个延迟级别(不能自定义)
# 1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h
# 使用场景
# 1. 订单超时取消:下单后 30 分钟未支付 → 取消
# 2. 延迟重试:失败后延迟一段时间再重试
# 发送延时消息
Message msg = new Message("order-topic", orderJson.getBytes());
msg.setDelayTimeLevel(3); // 延迟级别 3 = 10s
producer.send(msg);
# 实现原理
# 延时消息先存在 SCHEDULE_TOPIC_XXXX 中
# 定时任务扫描到期消息 → 投递到目标 Topic
实战场景
场景:订单超时取消
# 使用延时消息实现订单超时取消
# 1. 下单时发送延时消息(延迟 30 分钟)
Message msg = new Message("order-cancel-topic", orderId.getBytes());
msg.setDelayTimeLevel(16); // 级别 16 = 30m
producer.send(msg);
# 2. 30 分钟后消费者收到消息
@RocketMQMessageListener(topic = "order-cancel-topic", consumerGroup = "order-cancel-group")
@Component
public class OrderCancelConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String orderId) {
Order order = orderService.getById(orderId);
if (order.getStatus() == OrderStatus.UNPAID) {
// 未支付,取消订单
orderService.cancel(orderId);
}
// 已支付,忽略
}
}
面试模拟
面试官:RocketMQ 的存储架构是怎样的?
你:CommitLog + ConsumeQueue。所有消息顺序写入 CommitLog(顺序写性能极高),ConsumeQueue 按 Topic+Queue 建立索引指向 CommitLog 偏移量。消费者先读 ConsumeQueue 再读 CommitLog,相当于"目录+内容"的设计。
面试官:RocketMQ 事务消息的原理?
你:半消息+回查。先发半消息(消费者不可见),执行本地事务,成功则提交半消息(消费者可见),失败则回滚。如果 Broker 超时未收到确认,回查本地事务状态决定提交或回滚。