概述
本文介绍 Spring 5.x 使用 Spring AMQP (spring-rabbit) 接入 RabbitMQ,通过 XML 配置声明队列/交换机/绑定,并完成收发闭环。
核心要点:
- RabbitAdmin 负责资源自动声明
- RabbitTemplate/AmqpTemplate 负责收发
- 直连交换机需显式指定 exchange + routingKey 避免默认交换机误投
核心组件
1. RabbitAdmin
用于声明和管理 RabbitMQ 资源(队列、交换器、绑定等)。应用启动时,RabbitAdmin 会自动检测 Spring 容器中所有的 Queue、Exchange 和 Binding 对象,并在 RabbitMQ 服务器上创建相应资源。
2. RabbitTemplate
发送和接收消息的核心工具类。提供多种消息发送方法(如 convertAndSend、send)和接收方法(如 receive、receiveAndConvert)。
3. SimpleMessageListenerContainer
消息监听容器,用于异步消费消息。支持并发消费、消息确认等特性。
POM 配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>icu.wzk</groupId>
<artifactId>rabbitmq-demo-spring</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>5.2.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>5.2.6.RELEASE</version>
</dependency>
</dependencies>
</project>
XML 配置 (rabbit-context.xml)
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<rabbit:connection-factory id="connectionFactory"
host="localhost"
port="5672"
virtual-host="/"
username="admin"
password="secret" />
<!-- RabbitTemplate:用于发送/接收消息 -->
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
<!-- RabbitAdmin:自动声明 Queue / Exchange / Binding -->
<rabbit:admin id="rabbitAdmin" connection-factory="connectionFactory"/>
<!-- 队列 -->
<rabbit:queue name="myqueue"/>
<!-- 直连交换机 + 绑定 -->
<rabbit:direct-exchange name="direct.biz.ex"
auto-declare="true"
auto-delete="false"
durable="false">
<rabbit:bindings>
<rabbit:binding queue="myqueue" key="dir.ex"/>
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
Java 代码 (StartApp.java)
package icu.wzk;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* 最小闭环示例:启动 Spring 容器 -> 发送消息 -> 从队列取一条消息 -> 关闭容器
*
* 依赖前提(与 rabbit-context.xml 保持一致):
* - 存在队列:myqueue
* - 存在绑定:myqueue <- (direct.biz.ex, routingKey=dir.ex)
*
* 注意:
* 1) convertAndSend(String routingKey, Object message) 这个重载使用的是"默认交换机"(amq.default)。
* 默认交换机的路由规则:routingKey 必须等于队列名,否则投递不到队列。
* 2) 你现在 convertAndSend("dir.ex", ...) 会把 "dir.ex" 当成队列名路由;
* 如果你的队列不是叫 dir.ex,而是 myqueue,那么应该显式指定 exchange + routingKey:
* convertAndSend("direct.biz.ex", "dir.ex", ...)
* 3) receiveAndConvert("myqueue") 是同步拉取(basicGet),空队列会返回 null。
*/
public class StartApp {
// 与 XML 中定义保持一致:交换机 / 路由键 / 队列
private static final String EXCHANGE = "direct.biz.ex";
private static final String ROUTING_KEY = "dir.ex";
private static final String QUEUE = "myqueue";
public static void main(String[] args) {
// 1) 加载 Spring XML,初始化连接工厂、RabbitTemplate、RabbitAdmin、队列/交换机/绑定
ClassPathXmlApplicationContext context =
new ClassPathXmlApplicationContext("rabbit-context.xml");
// 2) 获取发送/接收入口(底层通常是 RabbitTemplate)
AmqpTemplate amqp = context.getBean(AmqpTemplate.class);
// 3) 批量发送 1000 条消息到指定 exchange + routingKey
// 这样不依赖默认交换机的"routingKey==队列名"规则,路由更清晰
for (int i = 0; i < 1000; i++) {
amqp.convertAndSend(EXCHANGE, ROUTING_KEY, "wzk" + i);
}
// 4) 同步拉取队列中的一条消息(可能为 null:队列为空 / 尚未投递完成 / 被其他消费者取走)
Object msg = amqp.receiveAndConvert(QUEUE);
System.out.println(msg);
// 5) 关闭容器:释放连接等资源
context.close();
}
}
错误速查
| 症状 | 根因定位 | 修复 |
|---|---|---|
| 启动报 Connection refused / 连接超时 | RabbitMQ 未启动、端口/Host 不通、防火墙拦截 | telnet host 5672 / RabbitMQ 日志 / 容器端口映射 |
| 启动报 ACCESS_REFUSED - Login was refused | 用户名/密码错误或无 vhost 权限 | RabbitMQ 管理台 Users / vhost 权限 |
| 启动报 NOT_FOUND - no exchange ‘direct.biz.ex’ | 交换机未声明成功或声明顺序/权限问题 | 确保 <rabbit:admin …/> 存在且连接正常 |
| 发送无异常但队列无消息 | 使用了 convertAndSend(routingKey, msg) 走默认交换机,routingKey≠队列名导致无法路由 | 使用 convertAndSend(exchange, routingKey, msg) |
| receiveAndConvert(“myqueue”) 返回 null | 队列为空/尚未投递完成/被其他消费者取走 | 确认路由是否命中 |
| 启动报 PRECONDITION_FAILED - inequivalent arg ‘durable’ | 同名 exchange/queue 已存在但属性不同 | 统一 durable/auto-delete 配置 |
| Spring XML 解析报 schema/DTD 相关错误 | 无法加载 spring-rabbit.xsd | 保证可访问 schema |
| direct 交换机绑定了但仍不路由 | routingKey 不精确匹配;绑定 key 写错 | direct 必须精确匹配 |