TL;DR
- Scenario: Multiple consumers share tasks from same queue + one message broadcast to multiple subscribers
- Conclusion: Work Queue relies on manual ack + basicQos to control distribution; fanout relies on exchange binding and temporary queues to achieve “one-to-many”
- Output: Java producer/consumer code skeleton, unnamed exchange usage, temporary queue and binding verification path
RabbitMQ Work Modes
Work Queue
Producer sends messages, starts multiple consumer instances to consume messages, each consumer only consumes part of the information, achieving load balancing effect.
NewTask
package icu.wzk.demo;
public class TestTask {
private static final String HOST = "localhost";
private static final String VIRTUAL_HOST = "/";
private static final String USERNAME = "admin";
private static final String PASSWORD = "secret";
private static final int PORT = 5672;
private static final String QUEUE_NAME = "wzk-icu";
private static final String[] WORKS = {
"hello.",
"hello..",
"hello...",
"hello....",
"hello.....",
"hello......",
"hello........",
"hello.........",
"hello..........",
"hello..........."
};
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setVirtualHost(VIRTUAL_HOST);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setPort(PORT);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String exchange = "";
for (String work : WORKS) {
channel.basicPublish(
exchange,
QUEUE_NAME,
null,
work.getBytes(StandardCharsets.UTF_8)
);
System.out.println(" [x] Sent '" + work + "'");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
TestTask2
package icu.wzk.demo;
public class TestTask2 {
private static final String TASK_QUEUE_NAME = "wzk-icu";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("secret");
// false = manual ack (recommended with basicQos)
boolean autoAck = false;
try (Connection conn = factory.newConnection(); Channel channel = conn.createChannel()) {
// Only pull 1 message at a time, avoid flooding consumer
channel.basicQos(1);
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String task = new String(delivery.getBody(), StandardCharsets.UTF_8);
long tag = delivery.getEnvelope().getDeliveryTag();
System.out.println(" [x] Received '" + task + "'");
try {
doWork(task);
System.out.println(" [x] Done");
// Manual acknowledge
channel.basicAck(tag, false);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// Requeue on interrupt
channel.basicNack(tag, false, true);
} catch (Exception e) {
// Requeue on failure (change to false to discard if needed)
channel.basicNack(tag, false, true);
}
};
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
// Block main thread, keep process alive
synchronized (TestTask2.class) {
try {
TestTask2.class.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
private static void doWork(String task) throws InterruptedException {
System.out.println("task = " + task);
for (char ch : task.toCharArray()) {
if (ch == '.') {
Thread.sleep(1000);
}
}
}
}
Publish/Subscribe
In RabbitMQ message queue system, fanout type exchange is an important message distribution mechanism. This exchange works like broadcast mode with the following characteristics:
-
Message Distribution Mechanism:
- When producer sends message to Exchange, it completely ignores RoutingKey setting
- Exchange copies and pushes every received message to all queues bound to that Exchange
- Each consumer queue receives complete message copy
-
Queue Binding Process:
- When each consumer subscribes, RabbitMQ automatically creates an exclusive queue for it
- This newly created queue establishes binding relationship with specified Exchange
- Binding process doesn’t involve any routing rules or filter conditions
-
Typical Application Scenarios:
- Real-time notification systems (e.g., stock quote push)
- Log collection systems (multiple log processing services simultaneously receive logs)
- Event broadcast systems (order creation event notifies multiple microservices)
Publish/subscribe uses fanout, creating exchange called logs:
channel.exchangeDeclare("logs", "fanout");
Fanout exchange is simple, from the name you can see it’s like blowing with a fan, sending received messages to all queues it knows.
Unnamed Exchange
In previous examples, although we didn’t specify an exchange, we could still send messages to queue because we used the default exchange.
channel.basicPublish("", "hello", null, message.getBytes());
First parameter is exchange name, empty string directly uses RoutingKey to send message to queue.
If we want to send message to specified exchange:
channel.basicPublish("logs", "", null, message.getBytes());
Temporary Queues
Both producers and consumers send and receive messages from queues by queue name. When using RabbitMQ, the following points need attention when creating queues:
-
Queue Creation Mechanism:
- When connecting to RabbitMQ, usually need to create a new empty queue
- Queue naming has two options:
- Custom naming: Can specify a meaningful queue name
- Auto-generated: Let RabbitMQ server generate random queue name
-
Queue Lifecycle Management:
- Temporary queue characteristic: When declaring queue, set “exclusive” parameter to true, queue becomes exclusive queue
- Auto-delete mechanism: For exclusive queues, once consumer disconnects, RabbitMQ automatically deletes that queue
- Persistent queue: If queue needs to be retained long-term, set “durable” parameter to true
String queueName = channel.queueDeclare().getQueue();
The above code declares a non-persistent, exclusive, auto-deleted queue, with server randomly generating the name.
Binding
After creating message queue and fanout type exchange, we need to bind them so the exchange sends messages to that queue. Fanout exchange broadcasts all received messages to all queues bound to it.
channel.queueBind(queueName, "logs", "");
Meaning of parameters in this code:
queueName: Queue name to bindlogs: Exchange name"": Routing key, not used in fanout type, so empty string
Error Quick Reference
| Symptom | Root Cause | Fix |
|---|---|---|
| Multiple consumers but uneven distribution, one consumer “overloaded” | basicQos not set or prefetch too large | Check consumer-side throughput and unacknowledged count (unacked) channel.basicQos(1..N); tune N after manual ack |
| Messages lost after consumer exception | autoAck=true or ack too early | Check basicConsume autoAck parameter, ack timing in logs Set autoAck=false; only basicAck after business completes |
| Failed messages retry infinitely, queue “spinning” | basicNack(..., requeue=true) for unrecoverable errors also requeue | Observe same message appearing repeatedly without backoff Distinguish recoverable/unrecoverable: unrecoverable requeue=false or go DLX/retry queue |
| Queue/messages disappear after restart | Queue non-durable + message non-durable | Check queueDeclare(durable=false), publish properties Production: durable queue + persistent message |
| Pub/sub consumer queue disappears after disconnect | Expected behavior of temporary queue exclusive/auto-delete | For persistent subscription, change to named queue + durable; keep temporary for temporary subscriptions |
| fanout setting routingKey has no effect | fanout ignores routingKey | Check exchange_type=fanout; if need key-based routing, change to direct/topic; fanout keep routingKey empty |
| After sending to exchange consumer can’t receive | No queueBind or bound to wrong exchange | rabbitmqctl list_bindings to check if binding exists Confirm queueBind(queueName,"logs","") matches exchange name |
| Using default exchange send fails (no route) | routingKey (queue name) doesn’t exist or misspelled | Compare basicPublish routingKey with actual queue name First ensure queueDeclare creates target queue |
| Consumer process exits causing consumption to stop | try-with-resources ends or main thread not blocked | Keep process alive; or use more standard lifecycle management |
| Connection failed/permission error | vhost/username/password/permissions mismatch | RabbitMQ logs and connection exception stack Validate vhost exists, user has configure/write/read permissions on vhost |