TL;DR

  • Scenario: Spring Boot quickly integrates with RabbitMQ, implements HTTP triggered delivery, consumer listener consumption
  • Conclusion: Using Spring AMQP to declare topology (Queue/DirectExchange/Binding) + AmqpTemplate delivery + @RabbitListener consumption completes the loop
  • Output: Reusable minimum runnable Demo (POM + yml + config class + Controller + Consumer)

Spring Boot Integration with RabbitMQ

Basic Introduction

Spring AMQP is Spring framework’s abstract implementation of AMQP (Advanced Message Queuing Protocol), providing a unified programming model for operating AMQP. Spring Rabbit is the concrete implementation of Spring AMQP, specifically encapsulating and extending RabbitMQ message middleware.


POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>icu.wzk</groupId>
    <artifactId>rabbitmq-springboot</artifactId>
    <version>1.0-SNAPSHOT</version>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.1</version>
        <relativePath/>
    </parent>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>
</project>

application.yml

spring:
  application:
    name: rabbit-mq
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: secret
    virtual-host: /

StartApp

package icu.wzk;

@SpringBootApplication
public class StartApp {
    public static void main(String[] args) {
        SpringApplication.run(StartApp.class, args);
    }
}

RabbitConfig

package icu.wzk.config;

/**
 * RabbitMQ topology declaration:
 * - Queue: myqueue
 * - Exchange: myex (direct)
 * - Binding: myqueue <- myex, routingKey = direct.biz.ex
 *
 * Description:
 * 1) durable=true: exists after broker restart (production environment default should be true)
 * 2) autoDelete=false: doesn't auto-delete even without consumers
 * 3) Here using DirectExchange, routing rule is routingKey exact match
 */
@Configuration
public class RabbitConfig {

    public static final String QUEUE = "myqueue";
    public static final String EXCHANGE = "myex";
    public static final String ROUTING_KEY = "direct.biz.ex";

    /**
     * Queue: persistent queue
     */
    @Bean
    public Queue myQueue() {
        return new Queue(QUEUE, true, false, false);
    }

    /**
     * Exchange: DirectExchange (exact routing)
     */
    @Bean
    public Exchange myExchange() {
        return new DirectExchange(EXCHANGE, true, false);
    }

    /**
     * Binding: bind queue to exchange, specify routingKey
     *
     * Recommended写法: Use BindingBuilder, avoid手写 Binding constructor parameters.
     */
    @Bean
    public Binding myBinding(Queue myQueue, DirectExchange myExchange) {
        return org.springframework.amqp.core.BindingBuilder
                .bind(myQueue)
                .to(myExchange)
                .with(ROUTING_KEY);
    }

    /*
     * Equivalent写法 for other Exchanges (replace myExchange() as needed):
     *
     * TopicExchange:
     *   return new TopicExchange("topic.biz.ex", true, false);
     *
     * FanoutExchange:
     *   return new FanoutExchange("fanout.biz.ex", true, false);
     *
     * HeadersExchange:
     *   return new HeadersExchange("header.biz.ex", true, false);
     *
     * CustomExchange:
     *   return new CustomExchange("custom.biz.ex", ExchangeTypes.DIRECT, true, false, null);
     */
}

HelloController

package icu.wzk.controlle;

@RestController
public class HelloController {
    @Autowired
    private AmqpTemplate rabbitTemplate;
    @RequestMapping("/send/{message}")
    public String sendMessage(@PathVariable String message) {
        rabbitTemplate.convertAndSend("myex", "direct.biz.ex",
                message);
        return "ok";
    }
}

HelloConsumer

Using listener to push messages

package icu.wzk.consumer;

@Component
public class HelloConsumer {
    @RabbitListener(queues = "myqueue")
    public void service(String message) {
        System.out.println("Message from queue: " + message);
    }
}

Error Quick Reference

SymptomRoot CauseDiagnosisFix
Startup error: Parameter 2 of method myBinding required a bean of type ‘DirectExchange’ that could not be foundmyExchange() return type written as Exchange, but myBinding parameter requires DirectExchangeStartup logs Bean injection exceptionChange myExchange() return type to DirectExchange
Send successful but queue has no message/consumer has no outputexchange/routingKey/queue/binding any inconsistencyRabbitMQ Management UIUnify constant references: Controller uses RabbitConfig.EXCHANGE/ROUTING_KEY
Request /send/{message} 404Controller package name misspelledStartup logs if mapping existsFix package name to icu.wzk.controller
ACCESS_REFUSED - Login was refused on startupIncorrect username/password/virtual-hostRabbitMQ logs + app startup logsVerify application.yml
Connection refused / Timeout on startupRabbitMQ not started, port unreachabletelnet host 5672/ncStart RabbitMQ; fix correct host
Consumer not consuming but no error@RabbitListener not taking effectStartup logs if listener container info printedEnsure HelloConsumer in scan path
Message Chinese garbled/abnormal displayProducer/consumer encoding and message converter inconsistentCheck if message body is byte[]/StringExplicitly use String