TL;DR

  • Scenario: Payment/recharge scenarios requiring eventual consistency, using RabbitMQ for async decoupling but must be accountable without losing orders.
  • Conclusion: Confirm solves “Broker received”, mandatory+Return solves “routing failure visibility”, persistence+idempotency as bottom line for “crash/retry/duplicate”.

Message Reliability Overview

Generally when using Alipay or WeChat to transfer, you scan code to pay and immediately get result. Payment platform must guarantee data correctness, data concurrent safety, and data eventual consistency.

Distributed Lock

  • Pros: Can guarantee strong data consistency.
  • Cons: May have performance issues under high concurrency.

Message Queue

  • Pros: Async, high concurrency
  • Cons: Has some latency, weak data consistency, and must guarantee the business operation definitely succeeds, cannot fail.

How to Guarantee Message Reliability

  1. Exception handling in client code, including producer and consumer
  2. AMQP, RabbitMQ transaction mechanism
  3. Producer confirmation mechanism
  4. Message persistence mechanism
  5. Broker-side high availability cluster
  6. Consumer acknowledgment mechanism
  7. Consumer rate limiting
  8. Message idempotency

Send Confirmation Mechanism

RabbitMQ introduced a lightweight approach called sender confirmation (publisher confirm) mechanism. Once producer sets channel to confirm mode, all messages published on that channel will be assigned a unique ID (starting from 1). Once messages are delivered to all matching queues, RabbitMQ sends an acknowledgment Basic.Ack to producer, containing the message’s unique ID.

Synchronous Publisher Confirms Example

public class PublisherConfirmsDemo {

    private static final String EXCHANGE = "ex.publisher.confirms";
    private static final String QUEUE = "q.publisher.confirms";
    private static final String ROUTING_KEY = QUEUE;

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("node1");
        factory.setVirtualHost("/");
        factory.setUsername("root");
        factory.setPassword("123456");
        factory.setPort(5672);

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 1) Enable publisher confirms (must call before publish)
            channel.confirmSelect();

            // 2) Declare topology
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, false, false, null);
            channel.queueDeclare(QUEUE, false, false, false, null);
            channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);

            // 3) Enable mandatory: broker will Return when route fails
            channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
                String returned = new String(body, StandardCharsets.UTF_8);
                System.err.printf("Return: code=%d text=%s ex=%s rk=%s body=%s%n",
                        replyCode, replyText, exchange, routingKey, returned);
            });

            String message = "hello";

            // 4) Publish message (mandatory=true: ensures routing failure can be sensed)
            channel.basicPublish(
                    EXCHANGE,
                    ROUTING_KEY,
                    true,   // mandatory
                    null,   // props: can set deliveryMode(persistent), headers, etc.
                    message.getBytes(StandardCharsets.UTF_8)
            );

            // 5) Synchronously wait for confirmation
            try {
                channel.waitForConfirmsOrDie(5_000);
                System.out.println("Confirm ACK: message = " + message);
            } catch (TimeoutException e) {
                System.out.println("Confirm timeout: message = " + message);
                throw e;
            } catch (IOException e) {
                System.out.println("Confirm NACK/channel exception: message = " + message);
                throw e;
            }
        }
    }
}

Batch Publisher Confirms

public class BatchPublisherConfirmsDemo {

    private static final String EXCHANGE = "ex.publisher.confirms";
    private static final String QUEUE = "q.publisher.confirms";
    private static final String ROUTING_KEY = "q.publisher.confirms";

    public static void publishBatch(ConnectionFactory factory) throws Exception {
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.confirmSelect();

            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, false, false, null);
            channel.queueDeclare(QUEUE, false, false, false, null);
            channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);

            final int batchSize = 10;           // How many to confirm per batch
            final int totalMessages = 102;        // Total messages

            int outstanding = 0;                // Current batch sent but unconfirmed count

            for (int i = 0; i < totalMessages; i++) {
                String body = "hello-" + i;

                channel.basicPublish(
                        EXCHANGE,
                        ROUTING_KEY,
                        null,
                        body.getBytes(StandardCharsets.UTF_8)
                );

                outstanding++;

                // When batch is full, block and wait for broker confirmation
                if (outstanding == batchSize) {
                    channel.waitForConfirmsOrDie(5_000);
                    System.out.println("Batch confirm: size=" + batchSize);
                    outstanding = 0;
                }
            }

            // Handle remaining tail less than batch
            if (outstanding > 0) {
                channel.waitForConfirmsOrDie(5_000);
                System.out.println("Batch confirm: tailSize=" + outstanding);
            }
        }
    }
}

Async Publisher Confirms

public class AsyncPublisherConfirmsDemo {

    private static final String EXCHANGE = "ex.publisher.confirms";
    private static final String QUEUE = "q.publisher.confirms";
    private static final String ROUTING_KEY = "q.publisher.confirms";

    public static void publishAsync(ConnectionFactory factory) throws Exception {
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.confirmSelect();

            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, false, false, null);
            channel.queueDeclare(QUEUE, false, false, false, null);
            channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);

            // Save "unconfirmed messages": seqNo -> payload
            ConcurrentNavigableMap<Long, String> outstanding = new ConcurrentSkipListMap<>();

            // ACK callback: clear confirmed messages
            ConfirmCallback ackCallback = (seqNo, multiple) -> {
                if (multiple) {
                    ConcurrentNavigableMap<Long, String> head = outstanding.headMap(seqNo, true);
                    head.clear();
                } else {
                    outstanding.remove(seqNo);
                }
            };

            // NACK callback: clear and record failed messages
            ConfirmCallback nackCallback = (seqNo, multiple) -> {
                if (multiple) {
                    ConcurrentNavigableMap<Long, String> head = outstanding.headMap(seqNo, true);
                    head.forEach((k, v) -> System.err.println("NACK: seqNo=" + k + " msg=" + v));
                    head.clear();
                } else {
                    String msg = outstanding.remove(seqNo);
                    System.err.println("NACK: seqNo=" + seqNo + " msg=" + msg);
                }
            };

            // Register confirm listener (async)
            channel.addConfirmListener(ackCallback, nackCallback);

            // High-speed sending: get seqNo first, then publish, then put to outstanding
            String prefix = "hello-";
            int total = 1000;

            for (int i = 0; i < total; i++) {
                String payload = prefix + i;

                long seqNo = channel.getNextPublishSeqNo();

                channel.basicPublish(
                        EXCHANGE,
                        ROUTING_KEY,
                        null,
                        payload.getBytes(StandardCharsets.UTF_8)
                );

                outstanding.put(seqNo, payload);
            }
        }
    }
}

Persistence Mechanism

Persistence is the foundation of improving RabbitMQ reliability. Message durability is guaranteed from several aspects:

  1. Exchange Persistence: Set durable parameter to true when defining
  2. Queue Persistence: Set durable parameter to true when defining
  3. Message Persistence: Set deliveryMode attribute in BasicProperties to 2

Error Quick Reference

SymptomRoot CauseFix
”Sent successfully”, but queue has no message and no errormandatory not enabled, silently discarded when can’t route to queuebasicPublish mandatory=true + ReturnListener
waitForConfirmsOrDie throws TimeoutExceptionBroker under load/network jitter/slow disk IO causing late confirmationIncrease timeout; change to async Confirm
waitForConfirmsOrDie throws IOExceptionBroker returns Nack or channel closedRetry/compensate for Nack
Batch Confirm fails, can’t determine which message failedBatch confirm only gives “batch result”Change to async Confirm
Messages lost after Broker restart/crashExchange/Queue not durable or message not persistedExchange durable=true, Queue durable=true, message deliveryMode=2
Consumer duplicate consumption/balance added twiceRe-delivery/at-least-once delivery semantics without idempotencyBusiness unique key for idempotency (DB unique constraint/deduplication table/state machine)

Summary

  • Confirm Mechanism: Solves “Broker received message” problem
  • mandatory + Return: Solves “routing failure visibility” problem
  • Persistence: Solves “crash message loss” problem
  • Idempotency: Solves “re-delivery/duplicate consumption” problem