TL;DR
- Scenario: Payment/recharge scenarios requiring eventual consistency, using RabbitMQ for async decoupling but must be accountable without losing orders.
- Conclusion: Confirm solves “Broker received”, mandatory+Return solves “routing failure visibility”, persistence+idempotency as bottom line for “crash/retry/duplicate”.
Message Reliability Overview
Generally when using Alipay or WeChat to transfer, you scan code to pay and immediately get result. Payment platform must guarantee data correctness, data concurrent safety, and data eventual consistency.
Distributed Lock
- Pros: Can guarantee strong data consistency.
- Cons: May have performance issues under high concurrency.
Message Queue
- Pros: Async, high concurrency
- Cons: Has some latency, weak data consistency, and must guarantee the business operation definitely succeeds, cannot fail.
How to Guarantee Message Reliability
- Exception handling in client code, including producer and consumer
- AMQP, RabbitMQ transaction mechanism
- Producer confirmation mechanism
- Message persistence mechanism
- Broker-side high availability cluster
- Consumer acknowledgment mechanism
- Consumer rate limiting
- Message idempotency
Send Confirmation Mechanism
RabbitMQ introduced a lightweight approach called sender confirmation (publisher confirm) mechanism. Once producer sets channel to confirm mode, all messages published on that channel will be assigned a unique ID (starting from 1). Once messages are delivered to all matching queues, RabbitMQ sends an acknowledgment Basic.Ack to producer, containing the message’s unique ID.
Synchronous Publisher Confirms Example
public class PublisherConfirmsDemo {
private static final String EXCHANGE = "ex.publisher.confirms";
private static final String QUEUE = "q.publisher.confirms";
private static final String ROUTING_KEY = QUEUE;
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("node1");
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");
factory.setPort(5672);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 1) Enable publisher confirms (must call before publish)
channel.confirmSelect();
// 2) Declare topology
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, false, false, null);
channel.queueDeclare(QUEUE, false, false, false, null);
channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
// 3) Enable mandatory: broker will Return when route fails
channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
String returned = new String(body, StandardCharsets.UTF_8);
System.err.printf("Return: code=%d text=%s ex=%s rk=%s body=%s%n",
replyCode, replyText, exchange, routingKey, returned);
});
String message = "hello";
// 4) Publish message (mandatory=true: ensures routing failure can be sensed)
channel.basicPublish(
EXCHANGE,
ROUTING_KEY,
true, // mandatory
null, // props: can set deliveryMode(persistent), headers, etc.
message.getBytes(StandardCharsets.UTF_8)
);
// 5) Synchronously wait for confirmation
try {
channel.waitForConfirmsOrDie(5_000);
System.out.println("Confirm ACK: message = " + message);
} catch (TimeoutException e) {
System.out.println("Confirm timeout: message = " + message);
throw e;
} catch (IOException e) {
System.out.println("Confirm NACK/channel exception: message = " + message);
throw e;
}
}
}
}
Batch Publisher Confirms
public class BatchPublisherConfirmsDemo {
private static final String EXCHANGE = "ex.publisher.confirms";
private static final String QUEUE = "q.publisher.confirms";
private static final String ROUTING_KEY = "q.publisher.confirms";
public static void publishBatch(ConnectionFactory factory) throws Exception {
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.confirmSelect();
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, false, false, null);
channel.queueDeclare(QUEUE, false, false, false, null);
channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
final int batchSize = 10; // How many to confirm per batch
final int totalMessages = 102; // Total messages
int outstanding = 0; // Current batch sent but unconfirmed count
for (int i = 0; i < totalMessages; i++) {
String body = "hello-" + i;
channel.basicPublish(
EXCHANGE,
ROUTING_KEY,
null,
body.getBytes(StandardCharsets.UTF_8)
);
outstanding++;
// When batch is full, block and wait for broker confirmation
if (outstanding == batchSize) {
channel.waitForConfirmsOrDie(5_000);
System.out.println("Batch confirm: size=" + batchSize);
outstanding = 0;
}
}
// Handle remaining tail less than batch
if (outstanding > 0) {
channel.waitForConfirmsOrDie(5_000);
System.out.println("Batch confirm: tailSize=" + outstanding);
}
}
}
}
Async Publisher Confirms
public class AsyncPublisherConfirmsDemo {
private static final String EXCHANGE = "ex.publisher.confirms";
private static final String QUEUE = "q.publisher.confirms";
private static final String ROUTING_KEY = "q.publisher.confirms";
public static void publishAsync(ConnectionFactory factory) throws Exception {
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.confirmSelect();
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, false, false, null);
channel.queueDeclare(QUEUE, false, false, false, null);
channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
// Save "unconfirmed messages": seqNo -> payload
ConcurrentNavigableMap<Long, String> outstanding = new ConcurrentSkipListMap<>();
// ACK callback: clear confirmed messages
ConfirmCallback ackCallback = (seqNo, multiple) -> {
if (multiple) {
ConcurrentNavigableMap<Long, String> head = outstanding.headMap(seqNo, true);
head.clear();
} else {
outstanding.remove(seqNo);
}
};
// NACK callback: clear and record failed messages
ConfirmCallback nackCallback = (seqNo, multiple) -> {
if (multiple) {
ConcurrentNavigableMap<Long, String> head = outstanding.headMap(seqNo, true);
head.forEach((k, v) -> System.err.println("NACK: seqNo=" + k + " msg=" + v));
head.clear();
} else {
String msg = outstanding.remove(seqNo);
System.err.println("NACK: seqNo=" + seqNo + " msg=" + msg);
}
};
// Register confirm listener (async)
channel.addConfirmListener(ackCallback, nackCallback);
// High-speed sending: get seqNo first, then publish, then put to outstanding
String prefix = "hello-";
int total = 1000;
for (int i = 0; i < total; i++) {
String payload = prefix + i;
long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish(
EXCHANGE,
ROUTING_KEY,
null,
payload.getBytes(StandardCharsets.UTF_8)
);
outstanding.put(seqNo, payload);
}
}
}
}
Persistence Mechanism
Persistence is the foundation of improving RabbitMQ reliability. Message durability is guaranteed from several aspects:
- Exchange Persistence: Set durable parameter to true when defining
- Queue Persistence: Set durable parameter to true when defining
- Message Persistence: Set deliveryMode attribute in BasicProperties to 2
Error Quick Reference
| Symptom | Root Cause | Fix |
|---|---|---|
| ”Sent successfully”, but queue has no message and no error | mandatory not enabled, silently discarded when can’t route to queue | basicPublish mandatory=true + ReturnListener |
| waitForConfirmsOrDie throws TimeoutException | Broker under load/network jitter/slow disk IO causing late confirmation | Increase timeout; change to async Confirm |
| waitForConfirmsOrDie throws IOException | Broker returns Nack or channel closed | Retry/compensate for Nack |
| Batch Confirm fails, can’t determine which message failed | Batch confirm only gives “batch result” | Change to async Confirm |
| Messages lost after Broker restart/crash | Exchange/Queue not durable or message not persisted | Exchange durable=true, Queue durable=true, message deliveryMode=2 |
| Consumer duplicate consumption/balance added twice | Re-delivery/at-least-once delivery semantics without idempotency | Business unique key for idempotency (DB unique constraint/deduplication table/state machine) |
Summary
- Confirm Mechanism: Solves “Broker received message” problem
- mandatory + Return: Solves “routing failure visibility” problem
- Persistence: Solves “crash message loss” problem
- Idempotency: Solves “re-delivery/duplicate consumption” problem