TL;DR

  • Scenario: Multiple consumers share tasks from same queue + one message broadcast to multiple subscribers
  • Conclusion: Work Queue relies on manual ack + basicQos to control distribution; fanout relies on exchange binding and temporary queues to achieve “one-to-many”
  • Output: Java producer/consumer code skeleton, unnamed exchange usage, temporary queue and binding verification path

RabbitMQ Work Modes

Work Queue

Producer sends messages, starts multiple consumer instances to consume messages, each consumer only consumes part of the information, achieving load balancing effect.

NewTask

package icu.wzk.demo;
public class TestTask {

    private static final String HOST = "localhost";
    private static final String VIRTUAL_HOST = "/";
    private static final String USERNAME = "admin";
    private static final String PASSWORD = "secret";
    private static final int PORT = 5672;

    private static final String QUEUE_NAME = "wzk-icu";

    private static final String[] WORKS = {
            "hello.",
            "hello..",
            "hello...",
            "hello....",
            "hello.....",
            "hello......",
            "hello........",
            "hello.........",
            "hello..........",
            "hello..........."
    };

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setVirtualHost(VIRTUAL_HOST);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);
        factory.setPort(PORT);

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String exchange = "";
            for (String work : WORKS) {
                channel.basicPublish(
                        exchange,
                        QUEUE_NAME,
                        null,
                        work.getBytes(StandardCharsets.UTF_8)
                );
                System.out.println(" [x] Sent '" + work + "'");
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

TestTask2

package icu.wzk.demo;
public class TestTask2 {

    private static final String TASK_QUEUE_NAME = "wzk-icu";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("secret");

        // false = manual ack (recommended with basicQos)
        boolean autoAck = false;

        try (Connection conn = factory.newConnection(); Channel channel = conn.createChannel()) {
            // Only pull 1 message at a time, avoid flooding consumer
            channel.basicQos(1);
            channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);

            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String task = new String(delivery.getBody(), StandardCharsets.UTF_8);
                long tag = delivery.getEnvelope().getDeliveryTag();
                System.out.println(" [x] Received '" + task + "'");
                try {
                    doWork(task);
                    System.out.println(" [x] Done");
                    // Manual acknowledge
                    channel.basicAck(tag, false);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    // Requeue on interrupt
                    channel.basicNack(tag, false, true);
                } catch (Exception e) {
                    // Requeue on failure (change to false to discard if needed)
                    channel.basicNack(tag, false, true);
                }
            };

            channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
            // Block main thread, keep process alive
            synchronized (TestTask2.class) {
                try {
                    TestTask2.class.wait();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
    }

    private static void doWork(String task) throws InterruptedException {
        System.out.println("task = " + task);
        for (char ch : task.toCharArray()) {
            if (ch == '.') {
                Thread.sleep(1000);
            }
        }
    }

}

Publish/Subscribe

In RabbitMQ message queue system, fanout type exchange is an important message distribution mechanism. This exchange works like broadcast mode with the following characteristics:

  1. Message Distribution Mechanism:

    • When producer sends message to Exchange, it completely ignores RoutingKey setting
    • Exchange copies and pushes every received message to all queues bound to that Exchange
    • Each consumer queue receives complete message copy
  2. Queue Binding Process:

    • When each consumer subscribes, RabbitMQ automatically creates an exclusive queue for it
    • This newly created queue establishes binding relationship with specified Exchange
    • Binding process doesn’t involve any routing rules or filter conditions
  3. Typical Application Scenarios:

    • Real-time notification systems (e.g., stock quote push)
    • Log collection systems (multiple log processing services simultaneously receive logs)
    • Event broadcast systems (order creation event notifies multiple microservices)

Publish/subscribe uses fanout, creating exchange called logs:

channel.exchangeDeclare("logs", "fanout");

Fanout exchange is simple, from the name you can see it’s like blowing with a fan, sending received messages to all queues it knows.

Unnamed Exchange

In previous examples, although we didn’t specify an exchange, we could still send messages to queue because we used the default exchange.

channel.basicPublish("", "hello", null, message.getBytes());

First parameter is exchange name, empty string directly uses RoutingKey to send message to queue.

If we want to send message to specified exchange:

channel.basicPublish("logs", "", null, message.getBytes());

Temporary Queues

Both producers and consumers send and receive messages from queues by queue name. When using RabbitMQ, the following points need attention when creating queues:

  1. Queue Creation Mechanism:

    • When connecting to RabbitMQ, usually need to create a new empty queue
    • Queue naming has two options:
      • Custom naming: Can specify a meaningful queue name
      • Auto-generated: Let RabbitMQ server generate random queue name
  2. Queue Lifecycle Management:

    • Temporary queue characteristic: When declaring queue, set “exclusive” parameter to true, queue becomes exclusive queue
    • Auto-delete mechanism: For exclusive queues, once consumer disconnects, RabbitMQ automatically deletes that queue
    • Persistent queue: If queue needs to be retained long-term, set “durable” parameter to true
String queueName = channel.queueDeclare().getQueue();

The above code declares a non-persistent, exclusive, auto-deleted queue, with server randomly generating the name.

Binding

After creating message queue and fanout type exchange, we need to bind them so the exchange sends messages to that queue. Fanout exchange broadcasts all received messages to all queues bound to it.

channel.queueBind(queueName, "logs", "");

Meaning of parameters in this code:

  • queueName: Queue name to bind
  • logs: Exchange name
  • "": Routing key, not used in fanout type, so empty string

Error Quick Reference

SymptomRoot CauseFix
Multiple consumers but uneven distribution, one consumer “overloaded”basicQos not set or prefetch too largeCheck consumer-side throughput and unacknowledged count (unacked) channel.basicQos(1..N); tune N after manual ack
Messages lost after consumer exceptionautoAck=true or ack too earlyCheck basicConsume autoAck parameter, ack timing in logs Set autoAck=false; only basicAck after business completes
Failed messages retry infinitely, queue “spinning”basicNack(..., requeue=true) for unrecoverable errors also requeueObserve same message appearing repeatedly without backoff Distinguish recoverable/unrecoverable: unrecoverable requeue=false or go DLX/retry queue
Queue/messages disappear after restartQueue non-durable + message non-durableCheck queueDeclare(durable=false), publish properties Production: durable queue + persistent message
Pub/sub consumer queue disappears after disconnectExpected behavior of temporary queue exclusive/auto-deleteFor persistent subscription, change to named queue + durable; keep temporary for temporary subscriptions
fanout setting routingKey has no effectfanout ignores routingKeyCheck exchange_type=fanout; if need key-based routing, change to direct/topic; fanout keep routingKey empty
After sending to exchange consumer can’t receiveNo queueBind or bound to wrong exchangerabbitmqctl list_bindings to check if binding exists Confirm queueBind(queueName,"logs","") matches exchange name
Using default exchange send fails (no route)routingKey (queue name) doesn’t exist or misspelledCompare basicPublish routingKey with actual queue name First ensure queueDeclare creates target queue
Consumer process exits causing consumption to stoptry-with-resources ends or main thread not blockedKeep process alive; or use more standard lifecycle management
Connection failed/permission errorvhost/username/password/permissions mismatchRabbitMQ logs and connection exception stack Validate vhost exists, user has configure/write/read permissions on vhost