TL;DR

  • 场景: 微服务解耦、异步削峰、广播通知,需要把消息”投递到对的队列”
  • 结论: Exchange 决定路由策略;direct 精确、topic 模糊、fanout 广播、headers 多条件但更吃 CPU
  • 产出: 一张版本矩阵 + 一张错误速查卡,直接对照排障与选型落地

版本矩阵

特性RabbitMQ 3.xRabbitMQ 4.x
Classic Mirrored Queues✅ 兼容❌ 已移除
Quorum Queues✅ 推荐
AMQP 1.0插件原生支持

验证说明: 经典镜像队列(Classic Mirrored Queues)已弃用且在 RabbitMQ 4.0 起移除;HA 推荐 Quorum Queues/Streams


RabbitMQ 架构

基本介绍

RabbitMQ 是一款开源的消息中间件(Message Broker),由 Rabbit Technologies Ltd 开发,采用 Mozilla Public License 开源协议。

行业应用

  • 互联网行业:用于微服务架构中的服务解耦、异步通信等场景
  • 电信行业:最初就是为解决电信系统间的可靠通信而设计的
  • 金融行业:处理交易消息、支付通知等
  • 物联网(IoT):与传感器设备通信
  • 传统企业IT系统:系统集成、数据同步等

核心特性

  1. 高可靠性:

    • 支持消息持久化
    • 提供发布确认机制
    • 支持事务(Transaction)和发送方确认(Publisher Confirm)
  2. 易扩展性:

    • 支持集群部署
    • 可以方便地添加节点
    • 支持镜像队列实现高可用
  3. 丰富的功能:

    • 多种交换机类型:直连(Direct)、主题(Topic)、扇出(Fanout)、头部(Headers)
    • 消息TTL(存活时间)
    • 死信队列
    • 优先级队列
    • 消息追踪

技术实现

  1. AMQP协议:

    • RabbitMQ 实现了 AMQP 0-9-1 协议
    • AMQP(Advanced Message Queuing Protocol)是一个提供统一消息服务的应用层标准协议
  2. Erlang语言:

    • 采用 Erlang OTP 平台开发
    • Erlang 天生支持高并发和分布式
    • 提供了良好的容错机制
  3. 多协议支持:

    • 除了原生支持 AMQP 协议外
    • 还支持 MQTT(通过插件)
    • STOMP(通过插件)
    • HTTP(通过管理插件)

典型应用场景

  1. 异步处理:如用户注册后发送邮件/短信通知
  2. 应用解耦:如订单系统与库存系统分离
  3. 流量削峰:应对突发高并发场景
  4. 日志处理:集中收集分布式系统日志
  5. 消息广播:如系统配置变更通知

Exchange

RabbitMQ 常用的交换器类型及其应用场景:

fanout(广播模式)

  • 特点: 将消息路由到所有绑定的队列,忽略路由键(Routing Key)
  • 应用场景: 适用于需要广播消息的场景,如系统通知、日志分发
  • 示例: 一个订单创建事件需要同时通知库存系统、物流系统和积分系统

工作原理:

  1. 消息生产者将消息发送到fanout交换器
  2. 交换器不检查消息的路由键(routing key)
  3. 交换器会将消息的副本发送到每个绑定的队列
  4. 每个绑定的队列都会收到完全相同的消息

实现示例:

# 声明fanout交换器
channel.exchange_declare(exchange='logs', exchange_type='fanout')

# 绑定队列
channel.queue_bind(exchange='logs', queue='queue1')
channel.queue_bind(exchange='logs', queue='queue2')

# 发布消息
channel.basic_publish(exchange='logs', routing_key='', body='Message')

注意事项:

  • 消息会被复制多份,可能影响系统性能
  • 不适合对消息处理顺序有严格要求的场景
  • 消费者需要处理可能重复的消息

direct(直接模式)

  • 特点: 根据精确匹配的路由键将消息路由到队列
  • 应用场景: 适用于需要精确路由的场景,如特定业务处理
  • 示例: 将支付成功的订单路由到”order.payment.success”队列,支付失败的订单路由到”order.payment.failed”队列

路由规则:

  1. 精确匹配机制: 只有当消息的Routing Key和队列的Binding Key完全一致时,消息才会被路由到该队列。

    • 如果Routing Key是”order.created”,那么只有Binding Key也是”order.created”的队列才会收到这条消息
    • “order”不会匹配”order.created”,“order.*“也不会匹配
  2. 多队列绑定: 允许多个队列使用相同的Binding Key绑定到同一个direct交换器。这种情况下,消息会被同时路由到所有匹配的队列,实现简单的广播功能。

典型应用场景:

  • 订单处理系统:可以使用”order.created”、“order.paid”等不同的Routing Key来区分不同类型的订单消息
  • 日志收集系统:用”log.error”、“log.warning”等Routing Key来分类不同级别的日志
  • 任务分发系统:根据任务类型使用不同的Routing Key来分发到特定的处理队列

性能优势: 由于采用精确匹配算法,direct交换器的路由效率很高,适合对性能要求较高的场景。

与其他交换器类型的区别:

  • 相比fanout交换器(广播所有队列),direct是有选择性的路由
  • 相比topic交换器(支持模式匹配),direct只支持完全匹配
  • 相比headers交换器(基于消息头匹配),direct基于Routing Key匹配

注意: 如果没有任何队列的Binding Key与消息的Routing Key匹配,那么该消息将会被直接丢弃,除非设置了备用交换器(alternate exchange)。


topic(主题模式)

  • 特点: 支持模糊匹配的路由键(支持*和#通配符),比direct更灵活
  • 应用场景: 适用于需要分类处理消息的场景

键名结构:

  • RoutingKey(路由键)和 BindingKey(绑定键)都采用”.”分割的多段式命名方式
  • 例如:“stock.usd.nyse” 或 “weather.asia.china.shanghai”

特殊通配符:

  1. 星号”*”:

    • 精确匹配一个单词段
    • 示例:BindingKey “stock.*.nyse” 可以匹配:
      • “stock.usd.nyse”
      • “stock.eur.nyse”
    • 但不能匹配:
      • “stock.nyse”(缺少一个段)
      • “stock.usd.future.nyse”(多出一个段)
  2. 井号”#”:

    • 匹配零个或多个单词段
    • 示例:BindingKey “weather.#” 可以匹配:
      • “weather”(零个后续段)
      • “weather.asia”
      • “weather.asia.china”
      • “weather.asia.china.shanghai”
    • 是最宽松的匹配模式

典型应用场景:

  • 日志分级处理:
    • BindingKey “syslog.*.error” 可接收所有子系统错误日志
    • BindingKey “syslog.auth.#” 可接收认证相关的所有级别日志
  • 地理位置消息路由:
    • “location.#.temperature” 可接收任意区域层级的温度数据
    • “location.country.*.humidity” 可接收国家下任意城市的湿度数据

使用注意事项:

  • 通配符必须作为独立的段存在,不能出现在单词中间
  • 合法的BindingKey示例:“.stock”、“market.#”、“order..completed”
  • 非法的BindingKey示例:“stck”(通配符不在段首)、“market..#“(连续使用通配符)

headers(头模式)

  • 特点: 根据消息头(headers)属性进行路由匹配,忽略路由键
  • 应用场景: 适用于需要基于消息属性路由的特殊场景
  • 匹配规则:
    • x-match=all:需匹配所有headers键值对
    • x-match=any:只需匹配任意一个headers键值对

工作流程:

  1. 绑定阶段:

    • 在创建队列并将其绑定到headers交换器时,需要指定一组键值对作为匹配条件
    • 可以设置匹配模式为”all”(所有条件都必须满足)或”any”(满足任一条件即可)
  2. 消息路由阶段:

    • 生产者发送消息时,需要在消息头中设置相应的属性
    • 交换器会提取消息头中的所有属性
    • 将消息头的属性与各队列绑定时指定的键值对进行比对
    • 当匹配条件满足时,消息就会被路由到对应的队列

实现示例:

# 绑定队列时指定匹配条件
channel.queue_bind(
    queue='high_priority_queue',
    exchange='headers_exchange',
    arguments={'x-match':'all', 'priority':'high', 'region':'north'}
)

# 发送消息时设置headers
properties = pika.BasicProperties(
    headers={'priority': 'high', 'region': 'north'}
)
channel.basic_publish(
    exchange='headers_exchange',
    routing_key='',  # headers交换器忽略routing key
    body=message,
    properties=properties
)

性能注意事项:

  • headers交换器需要解析每条消息的headers属性并进行复杂匹配
  • 匹配操作需要消耗较多CPU资源
  • 当队列数量较多时,性能下降明显
  • 在实际生产环境中,除非有特殊需求,否则建议使用direct或topic交换器

适用场景:

  • 需要基于多个条件进行复杂路由的情况
  • 路由条件可能经常变化的场景
  • 消息属性比路由键更能准确描述路由需求的情况

错误速查

症状根因定位修复
生产者发了消息但队列里没有direct/topic 路由键与 binding key 不匹配;或根本没绑定队列管理台看 exchange→bindings;核对 routing_key 与 binding_key补绑定;统一命名规范;必要时加 alternate-exchange/mandatory+return
消息”凭空消失”且无报错未设置 mandatory;无匹配绑定时消息被丢弃(默认行为)生产端开启 publisher confirms/returns;看 return 回调mandatory=true 并处理 return;或配置备用交换器
fanout 一开吞吐掉得厉害广播导致消息复制 N 份,队列/磁盘/网络放大看入队速率与队列堆积是否按订阅者数线性放大降低订阅者数;拆主题;改 topic 精准分发;限流与批处理
headers 路由延迟上升、CPU 高每条消息需要解析 headers 并做多条件匹配perf/CPU 火焰图;对比 direct/topic 同吞吐下 CPU能用 routing key 就不用 headers;简化条件;减少绑定数量
HA 配置后升级失败或队列不可用仍在使用 Classic Mirrored Queues,4.0 起已移除看队列类型/策略;检查是否 classic mirrored迁移到 Quorum Queues/Streams(按官方迁移路径)
消费端出现重复消费至少一次投递语义 + ack 时机不当/超时重投看消费端 ack 与重试日志;检查 redelivered 标记业务幂等;正确 ack;用重试队列/死信队列治理