TL;DR

  • 场景:Spring Boot 快速接入 RabbitMQ,实现 HTTP 触发投递、消费者监听消费
  • 结论:用 Spring AMQP 声明拓扑(Queue/DirectExchange/Binding)+ AmqpTemplate 投递 + @RabbitListener 消费即可闭环
  • 产出:可复用的最小可跑 Demo(POM + yml + 配置类 + Controller + Consumer)

Spring Boot 整合 RabbitMQ

基本介绍

Spring AMQP 是 Spring 框架对 AMQP (Advanced Message Queuing Protocol) 协议的抽象实现,它提供了统一的编程模型来操作 AMQP。而 Spring Rabbit 是 Spring AMQP 的具体实现,专门针对 RabbitMQ 消息中间件进行了封装和扩展。


POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>icu.wzk</groupId>
    <artifactId>rabbitmq-springboot</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.1</version>
        <relativePath/>
    </parent>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>
</project>

application.yml

spring:
  application:
    name: rabbit-mq
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: secret
    virtual-host: /

StartApp

package icu.wzk;

@SpringBootApplication
public class StartApp {
    public static void main(String[] args) {
        SpringApplication.run(StartApp.class, args);
    }
}

RabbitConfig

package icu.wzk.config;

/**
 * RabbitMQ 拓扑声明:
 * - Queue: myqueue
 * - Exchange: myex(direct)
 * - Binding: myqueue <- myex,routingKey = direct.biz.ex
 *
 * 说明:
 * 1) durable=true:broker 重启后依然存在(生产环境默认应为 true)
 * 2) autoDelete=false:没有消费者也不自动删除
 * 3) 这里用 DirectExchange,路由规则是 routingKey 精确匹配
 */
@Configuration
public class RabbitConfig {

    public static final String QUEUE = "myqueue";
    public static final String EXCHANGE = "myex";
    public static final String ROUTING_KEY = "direct.biz.ex";

    /**
     * 队列:持久化队列
     */
    @Bean
    public Queue myQueue() {
        return new Queue(QUEUE, true, false, false);
    }

    /**
     * 交换机:DirectExchange(精确路由)
     */
    @Bean
    public Exchange myExchange() {
        return new DirectExchange(EXCHANGE, true, false);
    }

    /**
     * 绑定:把队列绑定到交换机,并指定 routingKey
     *
     * 推荐写法:用 BindingBuilder,避免手写 Binding 构造参数时填错。
     */
    @Bean
    public Binding myBinding(Queue myQueue, DirectExchange myExchange) {
        return org.springframework.amqp.core.BindingBuilder
                .bind(myQueue)
                .to(myExchange)
                .with(ROUTING_KEY);
    }

    /*
     * 其它 Exchange 的等价写法(按需替换 myExchange()):
     *
     * TopicExchange:
     *   return new TopicExchange("topic.biz.ex", true, false);
     *
     * FanoutExchange:
     *   return new FanoutExchange("fanout.biz.ex", true, false);
     *
     * HeadersExchange:
     *   return new HeadersExchange("header.biz.ex", true, false);
     *
     * CustomExchange:
     *   return new CustomExchange("custom.biz.ex", ExchangeTypes.DIRECT, true, false, null);
     */
}

HelloController

package icu.wzk.controlle;

@RestController
public class HelloController {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    @RequestMapping("/send/{message}")
    public String sendMessage(@PathVariable String message) {
        rabbitTemplate.convertAndSend("myex", "direct.biz.ex",
                message);
        return "ok";
    }
}

HelloConsumer

使用监听器,用于推消息

package icu.wzk.consumer;

@Component
public class HelloConsumer {
    @RabbitListener(queues = "myqueue")
    public void service(String message) {
        System.out.println("消息队列推送来的消息:" + message);
    }
}

错误速查

症状根因定位修复
启动报错:Parameter 2 of method myBinding required a bean of type ‘DirectExchange’ that could not be foundmyExchange() 返回类型写成 Exchange,而 myBinding 形参要求 DirectExchange看启动日志的 Bean 注入异常把 myExchange() 返回类型改为 DirectExchange
发送成功但队列无消息/消费者无输出exchange/routingKey/queue/binding 任一不一致RabbitMQ Management UI统一常量引用:Controller 使用 RabbitConfig.EXCHANGE/ROUTING_KEY
请求 /send/{message} 404Controller 包名拼写错误看启动日志是否有映射修正包名为 icu.wzk.controller
启动时报 ACCESS_REFUSED - Login was refused用户名/密码/virtual-host 不正确RabbitMQ 日志 + 应用启动日志校对 application.yml
启动时报 Connection refused / TimeoutRabbitMQ 未启动、端口不通telnet host 5672/nc启动 RabbitMQ;改正确 host
消费者不消费但无报错@RabbitListener 未生效启动日志是否打印 listener 容器信息确保 HelloConsumer 在扫描路径
消息中文乱码/显示异常生产者/消费者编码与消息转换器不一致查看消息体是否为 byte[]/String明确使用 String