TL;DR

  • 场景: Spring(JavaConfig/注解)快速接入 RabbitMQ,完成最小可跑的发消息与同步拉取。
  • 结论: 用 CachingConnectionFactory + RabbitAdmin + RabbitTemplate + Queue Bean 即可完成基础链路;默认交换机直投队列名最省事。
  • 产出: 可复用的配置骨架 + 常见错误定位与修复速查卡。

Spring 整合 RabbitMQ

基本介绍

Spring AMQP 是 Spring 框架对 AMQP (Advanced Message Queuing Protocol) 协议的抽象实现,它提供了统一的编程模型来操作 AMQP。而 Spring Rabbit 是 Spring AMQP 的具体实现,专门针对 RabbitMQ 消息中间件进行了封装和扩展。


基于注解

编写代码

RabbitConfiguration,这里注意,ConnectionFactory 的包不要导入错了!

package icu.wzk.config;

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitConfiguration {

    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String VHOST = "/";
    private static final String USERNAME = "admin";
    private static final String PASSWORD = "secret";

    /**
     * Spring AMQP 的连接工厂(生产/消费都用它)
     * - 内部会维护连接与 Channel 缓存
     * - 直接配置 host/port/vhost/user/pass 即可
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory ccf = new CachingConnectionFactory(HOST, PORT);
        ccf.setVirtualHost(VHOST);
        ccf.setUsername(USERNAME);
        ccf.setPassword(PASSWORD);

        // 可选:限制缓存 channel 数量
        ccf.setChannelCacheSize(25);

        return ccf;
    }

    /** 自动声明 Queue/Exchange/Binding(只要你把它们声明成 Bean) */
    @Bean
    public AmqpAdmin amqpAdmin(ConnectionFactory factory) {
        return new RabbitAdmin(factory);
    }

    /** 发送/接收入口 */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
        return new RabbitTemplate(factory);
    }

    /** 队列:建议 durable=true,避免 broker 重启丢队列 */
    @Bean
    public Queue queue() {
        return new Queue("myqueue", true, false, false);
    }
}

注意: ConnectionFactory的包一定是:org.springframework.amqp.rabbit.connection.ConnectionFactory


demo程序讲解

使用 Spring 注解(JavaConfig) 方式实现 RabbitMQ 消息发送和同步接收的完整示例如下:

@Configuration
public class RabbitConfiguration {

    // 1. 声明连接工厂
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        return factory;
    }

    // 2. 声明 RabbitTemplate
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        // 设置消息转换器(可选)
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

    // 3. 声明队列
    @Bean
    public Queue myQueue() {
        return new Queue("myqueue");
    }

    // 4. 声明 RabbitAdmin(自动创建队列)
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }
}

@Service
public class MessageService {

    @Autowired
    private AmqpTemplate amqpTemplate;

    // 发送消息
    public void sendMessage(String message) {
        // 使用默认交换机(amq.default)发送消息
        // routingKey="myqueue" 将消息直接路由到同名队列
        amqpTemplate.convertAndSend("myqueue", message);
        System.out.println("发送消息: " + message);
    }

    // 同步接收消息
    public String receiveMessage() {
        // 同步拉取消息,队列为空返回null
        String message = (String) amqpTemplate.receiveAndConvert("myqueue");
        System.out.println("接收消息: " + message);
        return message;
    }
}

启动程序

package icu.wzk;

import icu.wzk.config.RabbitConfiguration;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class StartApp2 {

    private static final String QUEUE = "myqueue";

    public static void main(String[] args) {
        // 1) 启动基于注解的 Spring 容器,加载 RabbitConfiguration
        AnnotationConfigApplicationContext context =
                new AnnotationConfigApplicationContext(RabbitConfiguration.class);

        // 2) 获取 AMQP 操作入口(通常就是 RabbitTemplate)
        AmqpTemplate template = context.getBean(AmqpTemplate.class);

        // 3) 发送消息到队列(通过默认交换机实现)
        template.convertAndSend(QUEUE, "wzk");

        // 4) 同步拉取队列一条消息;若返回 null 说明当前队列为空/未投递成功/被其他消费者取走
        Object msg = template.receiveAndConvert(QUEUE);
        System.out.println(msg);

        // 5) 关闭容器,释放连接资源
        context.close();
    }
}

错误速查

症状根因定位修复
编译通过但运行期各种类型不匹配/Bean 找不到ConnectionFactory导错包(非 org.springframework.amqp.rabbit.connection.ConnectionFactory)IDE 跳转到 ConnectionFactory 定义,确认包路径统一替换为 Spring Rabbit 的 ConnectionFactory;清理无关依赖冲突
ACCESS_REFUSED - Login was refused用户名/密码错误,或账号无权限RabbitMQ 管理台 Users/Permissions;应用日志修正账号密码;为 vhost 配置读写权限
NOT_FOUND - no virtual host ’/’ 或无权限vhost 不存在或权限未授权管理台 Vhosts/Permissions;确认 setVirtualHost创建 vhost;授权用户到该 vhost;应用侧改为正确 vhost
java.net.ConnectException: Connection refusedBroker 未启动/端口不通/地址错telnet/nc 到 host:port;容器网络/防火墙启动 RabbitMQ;开放端口;修正 host/port
启动后队列没创建未注册 RabbitAdmin/AmqpAdmin Bean,或容器未加载配置类Spring 容器 Bean 列表;启动日志是否初始化 RabbitAdmin保证 RabbitAdmin/AmqpAdmin 为 Bean 且使用同一 ConnectionFactory;确保配置类被加载
404 NOT_FOUND - no queue ‘myqueue’队列未声明/声明到不同 vhost;或名称不一致管理台 Queues;核对 vhost 与队列名统一队列名;确保队列在同一 vhost 自动创建或手动创建
发送成功但 receiveAndConvert 打印 null队列为空、消息被其他消费者取走、路由键不匹配、发送到错误 vhost管理台队列 Ready/Unacked;查看是否有消费者;核对 routingKey先停掉其他消费者;确保使用默认交换机直投时 routingKey=队列名;核对 vhost
MessageConversionException / 反序列化失败发送与接收的 MessageConverter 不一致(String/JSON 混用)查看 RabbitTemplate 的 messageConverter;检查消息 content-type统一转换器;String 就用默认;JSON 就全链路 Jackson2JsonMessageConverter 并统一对象类型
Broker 重启后队列/绑定丢失队列/交换器未持久化(durable=false)或自动删除管理台查看队列 Durable 标记;声明代码生产建议 durable=true;明确 autoDelete/exclusive 的语义并按需设置
声称”receiveAndConvert 是同步阻塞”,但实际很快返回receiveAndConvert 默认是非阻塞拉取,空即返回 null观察调用耗时与返回值若要阻塞/异步消费,改用 @RabbitListener;或用轮询+超时策略实现等待语义(不建议高并发下使用同步拉取)