一句话总结
RabbitMQ 基于 AMQP 协议,核心模型:Producer → Exchange → Binding → Queue → Consumer。Exchange 四种类型:Direct(精确匹配 Routing Key)、Topic(通配符匹配,* 匹配一个词,# 匹配零或多个词)、Fanout(广播,忽略 Routing Key)、Headers(按 Header 匹配,很少用)。两大特色:死信队列(DLX,处理失败消息)、延迟队列(TTL + DLX 实现,或插件)。消息确认:Publisher Confirm(生产者确认)、Consumer ACK(消费者手动确认)。
初级理解
RabbitMQ 核心模型
# AMQP 模型
# Producer:生产者,发送消息到 Exchange
# Exchange:交换机,接收消息并路由到 Queue
# Binding:绑定,Exchange 和 Queue 之间的路由规则
# Queue:队列,存储消息
# Consumer:消费者,从 Queue 获取消息
# 消息流转
# Producer → Exchange → [Binding 规则] → Queue → Consumer
# 与 Kafka/RocketMQ 的区别
# RabbitMQ:Exchange 路由到 Queue,消费者从 Queue 消费
# Kafka/RocketMQ:Producer 直接发到 Topic,消费者从 Topic 消费
中级深入
Exchange 四种类型
| 类型 | 路由规则 | 示例 | 场景 |
|---|---|---|---|
| Direct | Routing Key 精确匹配 | key="order" → Queue A | 点对点 |
| Topic | 通配符匹配(* 和 #) | key="order.create" 匹配 order.* | 灵活路由 |
| Fanout | 广播(忽略 Routing Key) | 所有绑定的 Queue 都收到 | 广播通知 |
| Headers | 按 Header 匹配 | Header x-match=all | 复杂匹配(少用) |
# Topic Exchange 通配符
# *(星号):匹配一个单词
# #(井号):匹配零个或多个单词
# 示例
# order.* → 匹配 order.create、order.pay
# order.# → 匹配 order.create、order.pay.success
# *.pay → 匹配 order.pay、user.pay
# 配置示例
@Configuration
public class RabbitConfig {
@Bean
public TopicExchange orderExchange() {
return new TopicExchange("order-exchange");
}
@Bean
public Queue orderCreateQueue() {
return new Queue("order-create-queue");
}
@Bean
public Binding bindingOrderCreate() {
return BindingBuilder
.bind(orderCreateQueue())
.to(orderExchange())
.with("order.create"); // Routing Key
}
}
高级拓展
死信队列(DLX)
# 死信(Dead Letter):无法被消费的消息
# 三种情况变成死信:
# 1. 消息被拒绝(basic.reject/basic.nack)且 requeue=false
# 2. 消息 TTL 过期
# 3. 队列达到最大长度
# 死信队列配置
@Bean
public Queue orderQueue() {
return QueueBuilder.durable("order-queue")
.deadLetterExchange("dlx-exchange") // 死信交换机
.deadLetterRoutingKey("dlx.order") // 死信路由键
.ttl(30000) // 消息 TTL 30 秒
.maxLength(10000) // 队列最大长度
.build();
}
# 死信队列处理
@RabbitListener(queues = "dlx-queue")
public void handleDlx(Message message) {
// 处理死信:记录日志、告警、人工处理
log.error("死信消息: {}", new String(message.getBody()));
}
延迟队列
# 方案1:TTL + DLX(推荐)
# 消息发到带 TTL 的队列 → 过期 → 转发到死信队列
# 死信队列的消费者收到延迟后的消息
# 方案2:rabbitmq_delayed_message_exchange 插件
# 安装插件后,直接发送延迟消息
# 更灵活,但需要额外安装
# TTL + DLX 实现延迟队列
@Bean
public Queue delayQueue() {
return QueueBuilder.durable("delay-queue")
.deadLetterExchange("order-exchange")
.deadLetterRoutingKey("order.process")
.ttl(60000) // 60 秒延迟
.build();
}
# 发送延迟消息
rabbitTemplate.convertAndSend("delay-exchange", "delay.order", order);
# 60 秒后,消息出现在 order.process 队列
@RabbitListener(queues = "order-process-queue")
public void processOrder(Order order) {
// 延迟处理
}
实战场景
场景:订单支付超时取消
# 使用 RabbitMQ 延迟队列实现
# 1. 配置延迟队列
@Bean
public Queue orderDelayQueue() {
return QueueBuilder.durable("order-delay-queue")
.deadLetterExchange("order-exchange")
.deadLetterRoutingKey("order.cancel")
.ttl(30 * 60 * 1000) // 30 分钟
.build();
}
# 2. 下单时发送延迟消息
rabbitTemplate.convertAndSend("delay-exchange", "delay.order", orderId);
# 3. 30 分钟后处理
@RabbitListener(queues = "order-cancel-queue")
public void cancelOrder(String orderId) {
Order order = orderService.getById(orderId);
if (order.getStatus() == OrderStatus.UNPAID) {
orderService.cancel(orderId);
}
}
面试模拟
面试官:RabbitMQ 的 Exchange 有哪几种?
你:四种:Direct(精确匹配 Routing Key)、Topic(通配符匹配,* 和 #)、Fanout(广播,忽略 Routing Key)、Headers(按 Header 匹配,很少用)。Topic 最灵活,Fanout 适合广播。
面试官:RabbitMQ 如何实现延迟队列?
你:两种方案:TTL + DLX(消息发到带 TTL 的队列,过期后转发到死信队列,死信队列消费者处理延迟消息),或安装 rabbitmq_delayed_message_exchange 插件直接发送延迟消息。TTL+DLX 是标准方案,无需额外插件。