RocketMQ 核心原理?

2025年 阅读约 15 分钟 面试指南 · 消息中间件

深入解析RocketMQ核心原理:NameServer(路由中心)、Broker集群(主从)、存储架构(CommitLog + ConsumeQueue + IndexFile)、事务消息(半消息+回查)、延时消息,附面试模拟问答。

一句话总结

RocketMQ 架构四大组件:NameServer(路由中心,无状态,类似注册中心)、Broker(消息存储,主从架构)、Producer(生产者)、Consumer(消费者)。存储核心:CommitLog(所有消息顺序写入一个文件,顺序写性能极高)+ ConsumeQueue(按 Topic+Queue 建立索引,指向 CommitLog 偏移量)+ IndexFile(按 Key 查询消息)。两大特色:事务消息(半消息+回查,解决分布式事务)、延时消息(18 个级别,如 1s/5s/10s/30s/1m...)。

初级理解

RocketMQ 架构

# 四大组件 # NameServer:路由中心 # - 无状态,可集群部署 # - 每个 Broker 启动时注册到所有 NameServer # - Producer/Consumer 从 NameServer 获取路由信息 # Broker:消息存储 # - 主从架构(Master-Slave) # - Master 负责读写,Slave 负责备份 # - 支持同步/异步复制 # Producer:生产者 # - 从 NameServer 获取 Broker 信息 # - 选择 MessageQueue 发送消息 # Consumer:消费者 # - 从 NameServer 获取 Broker 信息 # - 从 Broker 拉取消息(Pull 模式)

中级深入

存储架构

# CommitLog:消息存储文件 # - 所有 Topic 的消息都写入同一个 CommitLog # - 顺序写入,性能极高(接近磁盘顺序写极限) # - 每个文件 1GB,满了新建 # ConsumeQueue:消费队列(索引) # - 按 Topic + Queue 维度建立索引 # - 存储:CommitLog Offset + Size + Tag HashCode # - 消费者先读 ConsumeQueue,再读 CommitLog # - 相当于 CommitLog 的"目录" # IndexFile:索引文件 # - 按 Key 查询消息 # - 哈希索引结构 # - 支持按 Message Key 快速查找 # 存储流程 # 消息 → CommitLog(顺序写)→ 异步构建 ConsumeQueue → 异步构建 IndexFile

NameServer 原理

# NameServer 特点 # 1. 无状态:节点之间不通信 # 2. 最终一致:Broker 定时上报(30s) # 3. 心跳检测:Broker 120s 无心跳则剔除 # 为什么不用 Zookeeper? # 1. NameServer 更轻量,部署简单 # 2. 无状态,不需要选举 # 3. CAP 理论:NameServer 选择 AP(可用性优先) # Zookeeper 选择 CP(一致性优先) # 路由发现流程 # Producer → NameServer → 获取 Topic 路由信息 # → 包含:Broker 地址、Queue 数量、读写权限 # → 缓存到本地,定时更新(30s)

高级拓展

事务消息

# 事务消息:解决分布式事务(如订单+扣库存) # 流程: # 1. 发送半消息(Half Message) # - 消息发送到 Broker,但对消费者不可见 # 2. 执行本地事务 # - 如:创建订单 # 3. 根据本地事务结果,提交或回滚半消息 # - 成功 → 提交(消息对消费者可见) # - 失败 → 回滚(消息删除) # 4. 如果 Broker 没收到提交/回滚(超时) # - 回查本地事务状态 # - 根据回查结果决定提交或回滚 # 代码示例 TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction( "tx-producer-group", "order-topic", MessageBuilder.withPayload(order).build(), order.getId() // 事务参数 ); # 事务监听器 @RocketMQTransactionListener public class OrderTransactionListener implements RocketMQLocalTransactionListener { @Override public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) { try { orderService.createOrder((Long) arg); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { return RocketMQLocalTransactionState.ROLLBACK; } } @Override public RocketMQLocalTransactionState checkLocalTransaction(Message msg) { // 回查本地事务状态 Long orderId = (Long) msg.getHeaders().get("orderId"); Order order = orderService.getById(orderId); return order != null ? COMMIT : ROLLBACK; } }

延时消息

# 延时消息:消息发送后,延迟一段时间才投递 # 18 个延迟级别(不能自定义) # 1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h # 使用场景 # 1. 订单超时取消:下单后 30 分钟未支付 → 取消 # 2. 延迟重试:失败后延迟一段时间再重试 # 发送延时消息 Message msg = new Message("order-topic", orderJson.getBytes()); msg.setDelayTimeLevel(3); // 延迟级别 3 = 10s producer.send(msg); # 实现原理 # 延时消息先存在 SCHEDULE_TOPIC_XXXX 中 # 定时任务扫描到期消息 → 投递到目标 Topic

实战场景

场景:订单超时取消

# 使用延时消息实现订单超时取消 # 1. 下单时发送延时消息(延迟 30 分钟) Message msg = new Message("order-cancel-topic", orderId.getBytes()); msg.setDelayTimeLevel(16); // 级别 16 = 30m producer.send(msg); # 2. 30 分钟后消费者收到消息 @RocketMQMessageListener(topic = "order-cancel-topic", consumerGroup = "order-cancel-group") @Component public class OrderCancelConsumer implements RocketMQListener<String> { @Override public void onMessage(String orderId) { Order order = orderService.getById(orderId); if (order.getStatus() == OrderStatus.UNPAID) { // 未支付,取消订单 orderService.cancel(orderId); } // 已支付,忽略 } }

面试模拟

面试官:RocketMQ 的存储架构是怎样的?

你:CommitLog + ConsumeQueue。所有消息顺序写入 CommitLog(顺序写性能极高),ConsumeQueue 按 Topic+Queue 建立索引指向 CommitLog 偏移量。消费者先读 ConsumeQueue 再读 CommitLog,相当于"目录+内容"的设计。

面试官:RocketMQ 事务消息的原理?

你:半消息+回查。先发半消息(消费者不可见),执行本地事务,成功则提交半消息(消费者可见),失败则回滚。如果 Broker 超时未收到确认,回查本地事务状态决定提交或回滚。