TL;DR

  • 场景: 用 Java(amqp-client)跑通 Hello World,并把生产者/消费者从建连到 ACK 的链路写清楚
  • 结论: 默认交换器 "" 会把”路由键=队列名”的消息直接投到队列;mandatory 可回退,immediate 在 RabbitMQ 不支持
  • 产出: 一份可对照的端到端流程清单 + 常见故障定位/修复速查卡

版本矩阵

项目已验证说明
RabbitMQ Server 4.2.2(2025)文档确认官方”latest release”口径;概念与默认交换器机制仍适用
RabbitMQ Server 3.x(AMQP 0-9-1 常见部署)文档确认正文与代码属于 AMQP 0-9-1 思路
Java 客户端 com.rabbitmq:amqp-client 5.28.0文档确认 RabbitMQ 官方 Java Client 当前版本说明
默认交换器 "" 行为文档确认队列声明后自动绑定到默认交换器,routingKey=队列名即路由
immediate 标志文档确认 RabbitMQ 不支持 immediate

工作流程

生产者流程

  1. 生产者连接 RabbitMQ: 生产者首先与 RabbitMQ 服务器建立 TCP 连接(Connection),这是网络通信的基础。在连接建立后,生产者会在该连接上开启一个信道(Channel),用于实际的消息传输。信道是轻量级的,多个信道可以共享同一个 TCP 连接。

  2. 声明交换器(Exchange): 生产者通过信道声明一个交换器,设置交换器类型(direct/topic/fanout/headers)、持久化等属性。

  3. 声明队列(Queue): 生产者声明队列并配置属性:持久化(durable)、排他性(exclusive)、自动删除(auto-delete)等。

  4. 绑定交换器与队列: 生产者通过 bindingKey 将交换器和队列绑定。对于 direct 类型,bindingKey 与 RoutingKey 完全匹配;对于 topic 类型,可以使用通配符。

  5. 发送消息至 RabbitMQ Broker: 消息包含 RoutingKey、交换器名称、消息属性(持久化、优先级、过期时间等)和消息体。

  6. 交换器路由消息: 交换器根据 RoutingKey 和绑定关系查找匹配的队列:

    • 找到匹配队列:消息存入队列等待消费者处理
    • 未找到匹配队列:根据 mandatory 标志决定处理方式
      • mandatory=true:消息通过 Basic.Return 返回给生产者
      • mandatory=false:消息被直接丢弃
  7. 关闭信道和连接: 依次调用 channel.close()connection.close() 释放资源。

应用场景示例:

  • 订单系统使用 RabbitMQ 处理订单消息
  • 生产者(订单服务)声明 direct 类型交换器 order.exchange
  • 声明持久化队列 order.queue,绑定 key 为 order.create
  • 发送消息时 RoutingKey 设为 order.create

消费者流程

  1. 建立连接: 消费者通过 TCP 协议连接到 RabbitMQ Broker。这是一个长期存在的 TCP 连接。

  2. 创建信道: 在已建立的连接中创建新的信道(Channel),用于隔离不同的消息流。

  3. 声明队列: 消费者确保目标队列存在,通过 queue.declare 方法声明队列。

  4. 绑定消费: 使用 basic.consume 方法向 Broker 注册消费请求,设置消息处理回调函数。

  5. 消息投递: RabbitMQ Broker 按照 FIFO 顺序将队列中的消息投递给消费者。

  6. 处理消息: 消费者接收到消息后执行业务逻辑处理。处理过程应设计为幂等。

  7. 发送确认: 处理完成后,通过 basic.ack 方法显式发送确认(ACK),携带消息的投递标签。

  8. 消息删除: RabbitMQ 收到 ACK 后,会将该消息从队列中永久删除。

  9. 关闭信道和连接: 依次调用 channel.close()connection.close()

注意事项:

  • 生产环境中建议实现连接断线重连机制
  • 消息处理应放在 try-catch 块中,处理异常时合理使用 NACK
  • 对于重要消息,建议实现手动确认模式而非自动确认
  • 合理设置 QoS(prefetch count) 避免消费者过载

案例测试

Hello World 一对一的简单模式,生产者直接发送消息给 RabbitMQ,另一端进行消费。未定义和指定 Exchange 的情况下,使用的是 AMQP Default 这个内置的 Exchange。

HelloSender

package icu.wzk.demo;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * RabbitMQ:消息 Broker(接收消息并转发给下游应用)
 *
 * 术语
 * - Producer / Producing:发送消息的应用/行为
 * - Queue:RabbitMQ 内部组件,消息存储在队列中(占用宿主机内存/磁盘,受资源限制)
 *          可理解为"消息缓冲区":多个 Producer 可写同一队列,多个 Consumer 可读同一队列
 * - Consumer / Consuming:接收(消费)消息的应用/行为
 *
 * 说明
 * - Producer、Consumer、Queue 不要求在同一主机;通常分布在不同主机的不同应用
 * - 同一个应用也可以同时扮演 Producer 与 Consumer
 */
public class HelloSender {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("secret");
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            // 队列声明:非持久化、非独占、自动删除
            channel.queueDeclare(QUEUE_NAME, false, false, true, null);
            String message = "hello wzk.icu !";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("[x] Sent '" + message + "'");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

HelloReceiver

package icu.wzk.demo;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * RabbitMQ:消息 Broker(接收消息并转发给下游应用)
 *
 * 术语
 * - Producer / Producing:发送消息的应用/行为
 * - Queue:RabbitMQ 内部组件,消息存储在队列中(占用宿主机内存/磁盘,受资源限制)
 *          可理解为"消息缓冲区":多个 Producer 可写同一队列,多个 Consumer 可读同一队列
 * - Consumer / Consuming:接收(消费)消息的应用/行为
 *
 * 说明
 * - Producer、Consumer、Queue 不要求在同一主机;通常分布在不同主机的不同应用
 * - 同一个应用也可以同时扮演 Producer 与 Consumer
 */
public class HelloSender {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("secret");
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            // 队列声明:非持久化、非独占、自动删除
            channel.queueDeclare(QUEUE_NAME, false, false, true, null);
            String message = "hello wzk.icu !";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
            System.out.println("[x] Sent '" + message + "'");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

注意: 上述 HelloReceiver 代码与 HelloSender 相同,实际消费者代码应包含 DeliverCallback 和 basicConsume 方法。


错误速查

症状根因定位修复
生产端打印”Sent”,但队列无消息发送到非预期 exchange/routingKey;或队列未按预期绑定管控台看 Exchange/Binding;确认是否用默认交换器 "" 且 routingKey=队列名
消息被”吞掉”,生产端无异常mandatory=false 且路由不到任何队列时消息直接丢弃复现时打开 mandatory 并监听 Basic.Return;关键链路设 mandatory=true
文中提到 immediate 但运行效果对不上RabbitMQ 不支持 immediate 语义查官方规格说明;删除 immediate 相关表述
消费端启动后不消费/无回调缺少 basicConsume 与回调;或示例代码粘贴成了生产者看 HelloReceiver 是否包含 DeliverCallback / basicConsume
PRECONDITION_FAILED - inequivalent arg…重复声明同名队列但参数不一致统一队列声明参数;生产者/消费者对同一队列保持一致
消息”消费一次后又回来/重复处理”手动 ACK 前抛异常、连接中断;或 NACK requeue业务幂等;try/catch 内部决定 ACK/NACK;必要时关闭 requeue 或转 DLX
消费者被压垮,延迟飙升prefetch 过大/无 QoS;单条处理慢导致堆积设置合理 prefetch;拆分消费者实例;把慢操作异步化
队列”自己消失”声明为 autoDelete 或 exclusive,连接断开即删除生产环境用 durable=true、autoDelete=false、exclusive=false