概述

本文介绍 Spring 5.x 使用 Spring AMQP (spring-rabbit) 接入 RabbitMQ,通过 XML 配置声明队列/交换机/绑定,并完成收发闭环。

核心要点:

  • RabbitAdmin 负责资源自动声明
  • RabbitTemplate/AmqpTemplate 负责收发
  • 直连交换机需显式指定 exchange + routingKey 避免默认交换机误投

核心组件

1. RabbitAdmin

用于声明和管理 RabbitMQ 资源(队列、交换器、绑定等)。应用启动时,RabbitAdmin 会自动检测 Spring 容器中所有的 Queue、Exchange 和 Binding 对象,并在 RabbitMQ 服务器上创建相应资源。

2. RabbitTemplate

发送和接收消息的核心工具类。提供多种消息发送方法(如 convertAndSend、send)和接收方法(如 receive、receiveAndConvert)。

3. SimpleMessageListenerContainer

消息监听容器,用于异步消费消息。支持并发消费、消息确认等特性。


POM 配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>icu.wzk</groupId>
    <artifactId>rabbitmq-demo-spring</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>5.2.6.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
            <version>2.2.7.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>5.2.6.RELEASE</version>
        </dependency>
    </dependencies>
</project>

XML 配置 (rabbit-context.xml)

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
         http://www.springframework.org/schema/beans
         http://www.springframework.org/schema/beans/spring-beans.xsd
         http://www.springframework.org/schema/rabbit
         http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <rabbit:connection-factory id="connectionFactory"
                               host="localhost"
                               port="5672"
                               virtual-host="/"
                               username="admin"
                               password="secret" />

    <!-- RabbitTemplate:用于发送/接收消息 -->
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>

    <!-- RabbitAdmin:自动声明 Queue / Exchange / Binding -->
    <rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/>

    <!-- 队列 -->
    <rabbit:queue name="myqueue"/>

    <!-- 直连交换机 + 绑定 -->
    <rabbit:direct-exchange name="direct.biz.ex"
                            auto-declare="true"
                            auto-delete="false"
                            durable="false">
        <rabbit:bindings>
            <rabbit:binding queue="myqueue" key="dir.ex"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

</beans>

Java 代码 (StartApp.java)

package icu.wzk;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 最小闭环示例:启动 Spring 容器 -> 发送消息 -> 从队列取一条消息 -> 关闭容器
 *
 * 依赖前提(与 rabbit-context.xml 保持一致):
 * - 存在队列:myqueue
 * - 存在绑定:myqueue <- (direct.biz.ex, routingKey=dir.ex)
 *
 * 注意:
 * 1) convertAndSend(String routingKey, Object message) 这个重载使用的是"默认交换机"(amq.default)。
 *    默认交换机的路由规则:routingKey 必须等于队列名,否则投递不到队列。
 * 2) 你现在 convertAndSend("dir.ex", ...) 会把 "dir.ex" 当成队列名路由;
 *    如果你的队列不是叫 dir.ex,而是 myqueue,那么应该显式指定 exchange + routingKey:
 *    convertAndSend("direct.biz.ex", "dir.ex", ...)
 * 3) receiveAndConvert("myqueue") 是同步拉取(basicGet),空队列会返回 null。
 */
public class StartApp {

    // 与 XML 中定义保持一致:交换机 / 路由键 / 队列
    private static final String EXCHANGE = "direct.biz.ex";
    private static final String ROUTING_KEY = "dir.ex";
    private static final String QUEUE = "myqueue";

    public static void main(String[] args) {
        // 1) 加载 Spring XML,初始化连接工厂、RabbitTemplate、RabbitAdmin、队列/交换机/绑定
        ClassPathXmlApplicationContext context =
                new ClassPathXmlApplicationContext("rabbit-context.xml");

        // 2) 获取发送/接收入口(底层通常是 RabbitTemplate)
        AmqpTemplate amqp = context.getBean(AmqpTemplate.class);

        // 3) 批量发送 1000 条消息到指定 exchange + routingKey
        //    这样不依赖默认交换机的"routingKey==队列名"规则,路由更清晰
        for (int i = 0; i < 1000; i++) {
            amqp.convertAndSend(EXCHANGE, ROUTING_KEY, "wzk" + i);
        }

        // 4) 同步拉取队列中的一条消息(可能为 null:队列为空 / 尚未投递完成 / 被其他消费者取走)
        Object msg = amqp.receiveAndConvert(QUEUE);
        System.out.println(msg);

        // 5) 关闭容器:释放连接等资源
        context.close();
    }
}

错误速查

症状根因定位修复
启动报 Connection refused / 连接超时RabbitMQ 未启动、端口/Host 不通、防火墙拦截telnet host 5672 / RabbitMQ 日志 / 容器端口映射
启动报 ACCESS_REFUSED - Login was refused用户名/密码错误或无 vhost 权限RabbitMQ 管理台 Users / vhost 权限
启动报 NOT_FOUND - no exchange ‘direct.biz.ex’交换机未声明成功或声明顺序/权限问题确保 <rabbit:admin …/> 存在且连接正常
发送无异常但队列无消息使用了 convertAndSend(routingKey, msg) 走默认交换机,routingKey≠队列名导致无法路由使用 convertAndSend(exchange, routingKey, msg)
receiveAndConvert(“myqueue”) 返回 null队列为空/尚未投递完成/被其他消费者取走确认路由是否命中
启动报 PRECONDITION_FAILED - inequivalent arg ‘durable’同名 exchange/queue 已存在但属性不同统一 durable/auto-delete 配置
Spring XML 解析报 schema/DTD 相关错误无法加载 spring-rabbit.xsd保证可访问 schema
direct 交换机绑定了但仍不路由routingKey 不精确匹配;绑定 key 写错direct 必须精确匹配