TL;DR

  • Scenario: Spring (JavaConfig/annotation) quickly integrate with RabbitMQ, complete minimum runnable message sending and synchronous pull.
  • Conclusion: Using CachingConnectionFactory + RabbitAdmin + RabbitTemplate + Queue Bean completes basic chain; default exchange direct delivery to queue name is most straightforward.
  • Output: Reusable configuration skeleton + common error diagnosis and fix quick reference.

Spring Integration with RabbitMQ

Basic Introduction

Spring AMQP is Spring framework’s abstract implementation of AMQP (Advanced Message Queuing Protocol), providing a unified programming model for operating AMQP. Spring Rabbit is the concrete implementation of Spring AMQP, specifically encapsulating and extending RabbitMQ message middleware.


Based on Annotations

Writing Code

RabbitConfiguration, note: don’t import the wrong package for ConnectionFactory!

package icu.wzk.config;

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RabbitConfiguration {

    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String VHOST = "/";
    private static final String USERNAME = "admin";
    private static final String PASSWORD = "secret";

    /**
     * Spring AMQP's connection factory (used by both producer and consumer)
     * - Internally maintains connection and Channel cache
     * - Directly configure host/port/vhost/user/pass
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory ccf = new CachingConnectionFactory(HOST, PORT);
        ccf.setVirtualHost(VHOST);
        ccf.setUsername(USERNAME);
        ccf.setPassword(PASSWORD);

        // Optional: limit cached channel count
        ccf.setChannelCacheSize(25);

        return ccf;
    }

    /** Auto-declare Queue/Exchange/Binding (as long as you declare them as Beans) */
    @Bean
    public AmqpAdmin amqpAdmin(ConnectionFactory factory) {
        return new RabbitAdmin(factory);
    }

    /** Send/receive entry */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory factory) {
        return new RabbitTemplate(factory);
    }

    /** Queue: recommend durable=true, avoid broker restart losing queue */
    @Bean
    public Queue queue() {
        return new Queue("myqueue", true, false, false);
    }
}

Note: ConnectionFactory package must be: org.springframework.amqp.rabbit.connection.ConnectionFactory


Demo Program Explanation

Complete example of Spring annotation (JavaConfig) approach to implement RabbitMQ message sending and synchronous receiving:

@Configuration
public class RabbitConfiguration {

    // 1. Declare connection factory
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory factory = new CachingConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setUsername("guest");
        factory.setPassword("guest");
        return factory;
    }

    // 2. Declare RabbitTemplate
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory);
        // Set message converter (optional)
        template.setMessageConverter(new Jackson2JsonMessageConverter());
        return template;
    }

    // 3. Declare queue
    @Bean
    public Queue myQueue() {
        return new Queue("myqueue");
    }

    // 4. Declare RabbitAdmin (auto-create queue)
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        return new RabbitAdmin(connectionFactory);
    }
}

@Service
public class MessageService {

    @Autowired
    private AmqpTemplate amqpTemplate;

    // Send message
    public void sendMessage(String message) {
        // Use default exchange (amq.default) to send message
        // routingKey="myqueue" routes message directly to queue with same name
        amqpTemplate.convertAndSend("myqueue", message);
        System.out.println("Sent message: " + message);
    }

    // Synchronously receive message
    public String receiveMessage() {
        // Synchronously pull message, returns null if queue is empty
        String message = (String) amqpTemplate.receiveAndConvert("myqueue");
        System.out.println("Received message: " + message);
        return message;
    }
}

Startup Program

package icu.wzk;

import icu.wzk.config.RabbitConfiguration;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class StartApp2 {

    private static final String QUEUE = "myqueue";

    public static void main(String[] args) {
        // 1) Start annotation-based Spring container, load RabbitConfiguration
        AnnotationConfigApplicationContext context =
                new AnnotationConfigApplicationContext(RabbitConfiguration.class);

        // 2) Get AMQP operation entry (usually RabbitTemplate)
        AmqpTemplate template = context.getBean(AmqpTemplate.class);

        // 3) Send message to queue (via default exchange)
        template.convertAndSend(QUEUE, "wzk");

        // 4) Synchronously pull one message from queue; returns null if queue empty/not delivered/taken by other consumer
        Object msg = template.receiveAndConvert(QUEUE);
        System.out.println(msg);

        // 5) Close container, release connection resources
        context.close();
    }
}

Error Quick Reference

SymptomRoot CauseFix
Compiles but runtime type mismatch/Bean not found ConnectionFactoryWrong package (not org.springframework.amqp.rabbit.connection.ConnectionFactory)IDE navigate to ConnectionFactory definition, confirm package path, replace with Spring Rabbit’s ConnectionFactory; clean up dependency conflicts
ACCESS_REFUSED - Login was refusedWrong username/password, or account has no permissionRabbitMQ management console Users/Permissions; fix account/password in app logs; configure read/write permissions for vhost
NOT_FOUND - no virtual host ’/’ or no permissionvhost doesn’t exist or permission not authorizedManagement console Vhosts/Permissions; confirm setVirtualHost creates vhost; authorize user to vhost; change app side to correct vhost
java.net.ConnectException: Connection refusedBroker not started/port unreachable/wrong addresstelnet/nc to host:port; container network/firewall start RabbitMQ; open port; fix host/port
Queue not created after startupRabbitAdmin/AmqpAdmin Bean not registered, or container didn’t load config classSpring container Bean list; startup logs check if RabbitAdmin initialized Ensure RabbitAdmin/AmqpAdmin is Bean and uses same ConnectionFactory; ensure config class is loaded
404 NOT_FOUND - no queue ‘myqueue’Queue not declared/declared to different vhost; or name inconsistentManagement console Queues; verify vhost and queue name match; unify queue name; ensure queue in same vhost auto-created or manually created
Send successful but receiveAndConvert prints nullQueue empty/message taken by other consumer/routing key mismatch/sent to wrong vhostManagement console queue Ready/Unacked; check for consumers; verify routingKey Stop other consumers first; ensure default exchange direct delivery routingKey=queue name; verify vhost
MessageConversionException / deserialization failedSender and receiver MessageConverter inconsistent (String/JSON mixing)Check RabbitTemplate’s messageConverter; check message content-type Unify converter; String use default; JSON use full链路 Jackson2JsonMessageConverter and unify object type
Queue/binding lost after Broker restartQueue/exchange not persisted (durable=false) or auto-deleteManagement console check queue Durable flag; declaration code production recommend durable=true; clarify autoDelete/exclusive semantics and set as needed
Claimed “receiveAndConvert is synchronous blocking” but returns quicklyreceiveAndConvert default is non-blocking pull, returns null immediatelyObserve call time and return value If need blocking/async consumption, change to @RabbitListener; or use polling+timeout strategy for wait semantics (not recommended under high concurrency)