TL;DR
- Scenario: Spring Boot quickly integrates with RabbitMQ, implements HTTP triggered delivery, consumer listener consumption
- Conclusion: Using Spring AMQP to declare topology (Queue/DirectExchange/Binding) + AmqpTemplate delivery + @RabbitListener consumption completes the loop
- Output: Reusable minimum runnable Demo (POM + yml + config class + Controller + Consumer)
Spring Boot Integration with RabbitMQ
Basic Introduction
Spring AMQP is Spring framework’s abstract implementation of AMQP (Advanced Message Queuing Protocol), providing a unified programming model for operating AMQP. Spring Rabbit is the concrete implementation of Spring AMQP, specifically encapsulating and extending RabbitMQ message middleware.
POM
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>icu.wzk</groupId>
<artifactId>rabbitmq-springboot</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.1</version>
<relativePath/>
</parent>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
</project>
application.yml
spring:
application:
name: rabbit-mq
rabbitmq:
host: localhost
port: 5672
username: admin
password: secret
virtual-host: /
StartApp
package icu.wzk;
@SpringBootApplication
public class StartApp {
public static void main(String[] args) {
SpringApplication.run(StartApp.class, args);
}
}
RabbitConfig
package icu.wzk.config;
/**
* RabbitMQ topology declaration:
* - Queue: myqueue
* - Exchange: myex (direct)
* - Binding: myqueue <- myex, routingKey = direct.biz.ex
*
* Description:
* 1) durable=true: exists after broker restart (production environment default should be true)
* 2) autoDelete=false: doesn't auto-delete even without consumers
* 3) Here using DirectExchange, routing rule is routingKey exact match
*/
@Configuration
public class RabbitConfig {
public static final String QUEUE = "myqueue";
public static final String EXCHANGE = "myex";
public static final String ROUTING_KEY = "direct.biz.ex";
/**
* Queue: persistent queue
*/
@Bean
public Queue myQueue() {
return new Queue(QUEUE, true, false, false);
}
/**
* Exchange: DirectExchange (exact routing)
*/
@Bean
public Exchange myExchange() {
return new DirectExchange(EXCHANGE, true, false);
}
/**
* Binding: bind queue to exchange, specify routingKey
*
* Recommended写法: Use BindingBuilder, avoid手写 Binding constructor parameters.
*/
@Bean
public Binding myBinding(Queue myQueue, DirectExchange myExchange) {
return org.springframework.amqp.core.BindingBuilder
.bind(myQueue)
.to(myExchange)
.with(ROUTING_KEY);
}
/*
* Equivalent写法 for other Exchanges (replace myExchange() as needed):
*
* TopicExchange:
* return new TopicExchange("topic.biz.ex", true, false);
*
* FanoutExchange:
* return new FanoutExchange("fanout.biz.ex", true, false);
*
* HeadersExchange:
* return new HeadersExchange("header.biz.ex", true, false);
*
* CustomExchange:
* return new CustomExchange("custom.biz.ex", ExchangeTypes.DIRECT, true, false, null);
*/
}
HelloController
package icu.wzk.controlle;
@RestController
public class HelloController {
@Autowired
private AmqpTemplate rabbitTemplate;
@RequestMapping("/send/{message}")
public String sendMessage(@PathVariable String message) {
rabbitTemplate.convertAndSend("myex", "direct.biz.ex",
message);
return "ok";
}
}
HelloConsumer
Using listener to push messages
package icu.wzk.consumer;
@Component
public class HelloConsumer {
@RabbitListener(queues = "myqueue")
public void service(String message) {
System.out.println("Message from queue: " + message);
}
}
Error Quick Reference
| Symptom | Root Cause | Diagnosis | Fix |
|---|---|---|---|
| Startup error: Parameter 2 of method myBinding required a bean of type ‘DirectExchange’ that could not be found | myExchange() return type written as Exchange, but myBinding parameter requires DirectExchange | Startup logs Bean injection exception | Change myExchange() return type to DirectExchange |
| Send successful but queue has no message/consumer has no output | exchange/routingKey/queue/binding any inconsistency | RabbitMQ Management UI | Unify constant references: Controller uses RabbitConfig.EXCHANGE/ROUTING_KEY |
| Request /send/{message} 404 | Controller package name misspelled | Startup logs if mapping exists | Fix package name to icu.wzk.controller |
| ACCESS_REFUSED - Login was refused on startup | Incorrect username/password/virtual-host | RabbitMQ logs + app startup logs | Verify application.yml |
| Connection refused / Timeout on startup | RabbitMQ not started, port unreachable | telnet host 5672/nc | Start RabbitMQ; fix correct host |
| Consumer not consuming but no error | @RabbitListener not taking effect | Startup logs if listener container info printed | Ensure HelloConsumer in scan path |
| Message Chinese garbled/abnormal display | Producer/consumer encoding and message converter inconsistent | Check if message body is byte[]/String | Explicitly use String |