TL;DR
- 场景:Spring Boot 快速接入 RabbitMQ,实现 HTTP 触发投递、消费者监听消费
- 结论:用 Spring AMQP 声明拓扑(Queue/DirectExchange/Binding)+ AmqpTemplate 投递 + @RabbitListener 消费即可闭环
- 产出:可复用的最小可跑 Demo(POM + yml + 配置类 + Controller + Consumer)
Spring Boot 整合 RabbitMQ
基本介绍
Spring AMQP 是 Spring 框架对 AMQP (Advanced Message Queuing Protocol) 协议的抽象实现,它提供了统一的编程模型来操作 AMQP。而 Spring Rabbit 是 Spring AMQP 的具体实现,专门针对 RabbitMQ 消息中间件进行了封装和扩展。
POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>icu.wzk</groupId>
<artifactId>rabbitmq-springboot</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.1</version>
<relativePath/>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
</project>
application.yml
spring:
application:
name: rabbit-mq
rabbitmq:
host: localhost
port: 5672
username: admin
password: secret
virtual-host: /
StartApp
package icu.wzk;
@SpringBootApplication
public class StartApp {
public static void main(String[] args) {
SpringApplication.run(StartApp.class, args);
}
}
RabbitConfig
package icu.wzk.config;
/**
* RabbitMQ 拓扑声明:
* - Queue: myqueue
* - Exchange: myex(direct)
* - Binding: myqueue <- myex,routingKey = direct.biz.ex
*
* 说明:
* 1) durable=true:broker 重启后依然存在(生产环境默认应为 true)
* 2) autoDelete=false:没有消费者也不自动删除
* 3) 这里用 DirectExchange,路由规则是 routingKey 精确匹配
*/
@Configuration
public class RabbitConfig {
public static final String QUEUE = "myqueue";
public static final String EXCHANGE = "myex";
public static final String ROUTING_KEY = "direct.biz.ex";
/**
* 队列:持久化队列
*/
@Bean
public Queue myQueue() {
return new Queue(QUEUE, true, false, false);
}
/**
* 交换机:DirectExchange(精确路由)
*/
@Bean
public Exchange myExchange() {
return new DirectExchange(EXCHANGE, true, false);
}
/**
* 绑定:把队列绑定到交换机,并指定 routingKey
*
* 推荐写法:用 BindingBuilder,避免手写 Binding 构造参数时填错。
*/
@Bean
public Binding myBinding(Queue myQueue, DirectExchange myExchange) {
return org.springframework.amqp.core.BindingBuilder
.bind(myQueue)
.to(myExchange)
.with(ROUTING_KEY);
}
/*
* 其它 Exchange 的等价写法(按需替换 myExchange()):
*
* TopicExchange:
* return new TopicExchange("topic.biz.ex", true, false);
*
* FanoutExchange:
* return new FanoutExchange("fanout.biz.ex", true, false);
*
* HeadersExchange:
* return new HeadersExchange("header.biz.ex", true, false);
*
* CustomExchange:
* return new CustomExchange("custom.biz.ex", ExchangeTypes.DIRECT, true, false, null);
*/
}
HelloController
package icu.wzk.controlle;
@RestController
public class HelloController {
@Autowired
private AmqpTemplate rabbitTemplate;
@RequestMapping("/send/{message}")
public String sendMessage(@PathVariable String message) {
rabbitTemplate.convertAndSend("myex", "direct.biz.ex",
message);
return "ok";
}
}
HelloConsumer
使用监听器,用于推消息
package icu.wzk.consumer;
@Component
public class HelloConsumer {
@RabbitListener(queues = "myqueue")
public void service(String message) {
System.out.println("消息队列推送来的消息:" + message);
}
}
错误速查
| 症状 | 根因 | 定位 | 修复 |
|---|---|---|---|
| 启动报错:Parameter 2 of method myBinding required a bean of type ‘DirectExchange’ that could not be found | myExchange() 返回类型写成 Exchange,而 myBinding 形参要求 DirectExchange | 看启动日志的 Bean 注入异常 | 把 myExchange() 返回类型改为 DirectExchange |
| 发送成功但队列无消息/消费者无输出 | exchange/routingKey/queue/binding 任一不一致 | RabbitMQ Management UI | 统一常量引用:Controller 使用 RabbitConfig.EXCHANGE/ROUTING_KEY |
| 请求 /send/{message} 404 | Controller 包名拼写错误 | 看启动日志是否有映射 | 修正包名为 icu.wzk.controller |
| 启动时报 ACCESS_REFUSED - Login was refused | 用户名/密码/virtual-host 不正确 | RabbitMQ 日志 + 应用启动日志 | 校对 application.yml |
| 启动时报 Connection refused / Timeout | RabbitMQ 未启动、端口不通 | telnet host 5672/nc | 启动 RabbitMQ;改正确 host |
| 消费者不消费但无报错 | @RabbitListener 未生效 | 启动日志是否打印 listener 容器信息 | 确保 HelloConsumer 在扫描路径 |
| 消息中文乱码/显示异常 | 生产者/消费者编码与消息转换器不一致 | 查看消息体是否为 byte[]/String | 明确使用 String |