TL;DR
- Scenario: Spring (JavaConfig/annotation) quickly integrate with RabbitMQ, complete minimum runnable message sending and synchronous pull.
- Conclusion: Using CachingConnectionFactory + RabbitAdmin + RabbitTemplate + Queue Bean completes basic chain; default exchange direct delivery to queue name is most straightforward.
- Output: Reusable configuration skeleton + common error diagnosis and fix quick reference.
Spring Integration with RabbitMQ
Basic Introduction
Spring AMQP is Spring framework’s abstract implementation of AMQP (Advanced Message Queuing Protocol), providing a unified programming model for operating AMQP. Spring Rabbit is the concrete implementation of Spring AMQP, specifically encapsulating and extending RabbitMQ message middleware.
Based on Annotations
Writing Code
RabbitConfiguration, note: don’t import the wrong package for ConnectionFactory!
package icu.wzk.config;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfiguration {
private static final String HOST = "localhost";
private static final int PORT = 5672;
private static final String VHOST = "/";
private static final String USERNAME = "admin";
private static final String PASSWORD = "secret";
/**
* Spring AMQP's connection factory (used by both producer and consumer)
* - Internally maintains connection and Channel cache
* - Directly configure host/port/vhost/user/pass
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory ccf = new CachingConnectionFactory(HOST, PORT);
ccf.setVirtualHost(VHOST);
ccf.setUsername(USERNAME);
ccf.setPassword(PASSWORD);
// Optional: limit cached channel count
ccf.setChannelCacheSize(25);
return ccf;
}
/** Auto-declare Queue/Exchange/Binding (as long as you declare them as Beans) */
@Bean
public AmqpAdmin amqpAdmin(ConnectionFactory factory) {
return new RabbitAdmin(factory);
}
/** Send/receive entry */
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
return new RabbitTemplate(factory);
}
/** Queue: recommend durable=true, avoid broker restart losing queue */
@Bean
public Queue queue() {
return new Queue("myqueue", true, false, false);
}
}
Note: ConnectionFactory package must be: org.springframework.amqp.rabbit.connection.ConnectionFactory
Demo Program Explanation
Complete example of Spring annotation (JavaConfig) approach to implement RabbitMQ message sending and synchronous receiving:
@Configuration
public class RabbitConfiguration {
// 1. Declare connection factory
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
return factory;
}
// 2. Declare RabbitTemplate
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
// Set message converter (optional)
template.setMessageConverter(new Jackson2JsonMessageConverter());
return template;
}
// 3. Declare queue
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
// 4. Declare RabbitAdmin (auto-create queue)
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
}
@Service
public class MessageService {
@Autowired
private AmqpTemplate amqpTemplate;
// Send message
public void sendMessage(String message) {
// Use default exchange (amq.default) to send message
// routingKey="myqueue" routes message directly to queue with same name
amqpTemplate.convertAndSend("myqueue", message);
System.out.println("Sent message: " + message);
}
// Synchronously receive message
public String receiveMessage() {
// Synchronously pull message, returns null if queue is empty
String message = (String) amqpTemplate.receiveAndConvert("myqueue");
System.out.println("Received message: " + message);
return message;
}
}
Startup Program
package icu.wzk;
import icu.wzk.config.RabbitConfiguration;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
public class StartApp2 {
private static final String QUEUE = "myqueue";
public static void main(String[] args) {
// 1) Start annotation-based Spring container, load RabbitConfiguration
AnnotationConfigApplicationContext context =
new AnnotationConfigApplicationContext(RabbitConfiguration.class);
// 2) Get AMQP operation entry (usually RabbitTemplate)
AmqpTemplate template = context.getBean(AmqpTemplate.class);
// 3) Send message to queue (via default exchange)
template.convertAndSend(QUEUE, "wzk");
// 4) Synchronously pull one message from queue; returns null if queue empty/not delivered/taken by other consumer
Object msg = template.receiveAndConvert(QUEUE);
System.out.println(msg);
// 5) Close container, release connection resources
context.close();
}
}
Error Quick Reference
| Symptom | Root Cause | Fix |
|---|---|---|
| Compiles but runtime type mismatch/Bean not found ConnectionFactory | Wrong package (not org.springframework.amqp.rabbit.connection.ConnectionFactory) | IDE navigate to ConnectionFactory definition, confirm package path, replace with Spring Rabbit’s ConnectionFactory; clean up dependency conflicts |
| ACCESS_REFUSED - Login was refused | Wrong username/password, or account has no permission | RabbitMQ management console Users/Permissions; fix account/password in app logs; configure read/write permissions for vhost |
| NOT_FOUND - no virtual host ’/’ or no permission | vhost doesn’t exist or permission not authorized | Management console Vhosts/Permissions; confirm setVirtualHost creates vhost; authorize user to vhost; change app side to correct vhost |
| java.net.ConnectException: Connection refused | Broker not started/port unreachable/wrong address | telnet/nc to host:port; container network/firewall start RabbitMQ; open port; fix host/port |
| Queue not created after startup | RabbitAdmin/AmqpAdmin Bean not registered, or container didn’t load config class | Spring container Bean list; startup logs check if RabbitAdmin initialized Ensure RabbitAdmin/AmqpAdmin is Bean and uses same ConnectionFactory; ensure config class is loaded |
| 404 NOT_FOUND - no queue ‘myqueue’ | Queue not declared/declared to different vhost; or name inconsistent | Management console Queues; verify vhost and queue name match; unify queue name; ensure queue in same vhost auto-created or manually created |
| Send successful but receiveAndConvert prints null | Queue empty/message taken by other consumer/routing key mismatch/sent to wrong vhost | Management console queue Ready/Unacked; check for consumers; verify routingKey Stop other consumers first; ensure default exchange direct delivery routingKey=queue name; verify vhost |
| MessageConversionException / deserialization failed | Sender and receiver MessageConverter inconsistent (String/JSON mixing) | Check RabbitTemplate’s messageConverter; check message content-type Unify converter; String use default; JSON use full链路 Jackson2JsonMessageConverter and unify object type |
| Queue/binding lost after Broker restart | Queue/exchange not persisted (durable=false) or auto-delete | Management console check queue Durable flag; declaration code production recommend durable=true; clarify autoDelete/exclusive semantics and set as needed |
| Claimed “receiveAndConvert is synchronous blocking” but returns quickly | receiveAndConvert default is non-blocking pull, returns null immediately | Observe call time and return value If need blocking/async consumption, change to @RabbitListener; or use polling+timeout strategy for wait semantics (not recommended under high concurrency) |