TL;DR

  • 场景: 支付/充值等需要最终一致性的链路,用 RabbitMQ 做异步解耦但必须可追责不丢单。
  • 结论: Confirm 解决”Broker 收到”,mandatory+Return 解决”路由失败可见”,持久化+幂等兜底”宕机/重投/重复”。

消息可靠性概述

一般我们使用支付宝或者微信转账的时候,都是扫码支付,然后立刻得到结果。支付平台必须保证数据正确性,保证数据并发安全性,保证数据最终一致性。

分布式锁

  • 优点: 能够保证数据的强一致性。
  • 缺点: 高并发场景下可能有性能问题。

消息队列

  • 优点: 异步、高并发
  • 缺点: 有一定延时,数据弱一致性,并且必须能够保证该业务操作肯定能够成功,不能失败。

如何保证消息可靠性

  1. 客户端代码中的异常捕获,包括生产者和消费者
  2. AMQP、RabbitMQ 的事务机制
  3. 发送端确认机制
  4. 消息持久化机制
  5. Broker端的高可用集群
  6. 消费者确认机制
  7. 消费端限流
  8. 消息幂等性

发送确认机制

RabbitMQ 引入了一种轻量级的方式,叫发送方确认(publisher confirm)机制。生产者将信道设置为confirm模式,一旦信道进入confirm模式,所有在该信道上发布的消息都会被拍成一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ会发送一个确认Basic.Ack给生产者,包含消息的唯一ID。

同步 Publisher Confirms 示例

public class PublisherConfirmsDemo {

    private static final String EXCHANGE = "ex.publisher.confirms";
    private static final String QUEUE = "q.publisher.confirms";
    private static final String ROUTING_KEY = QUEUE;

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("node1");
        factory.setVirtualHost("/");
        factory.setUsername("root");
        factory.setPassword("123456");
        factory.setPort(5672);

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 1) 开启 publisher confirms(必须在 publish 前调用)
            channel.confirmSelect();

            // 2) 声明拓扑
            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, false, false, null);
            channel.queueDeclare(QUEUE, false, false, false, null);
            channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);

            // 3) 开启 mandatory:路由不到队列时 broker 会 Return 回来
            channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
                String returned = new String(body, StandardCharsets.UTF_8);
                System.err.printf("Return: code=%d text=%s ex=%s rk=%s body=%s%n",
                        replyCode, replyText, exchange, routingKey, returned);
            });

            String message = "hello";

            // 4) 发布消息(mandatory=true:保证路由失败能被感知)
            channel.basicPublish(
                    EXCHANGE,
                    ROUTING_KEY,
                    true,   // mandatory
                    null,   // props: 可在这里设置 deliveryMode(持久化)、headers 等
                    message.getBytes(StandardCharsets.UTF_8)
            );

            // 5) 同步等待确认
            try {
                channel.waitForConfirmsOrDie(5_000);
                System.out.println("Confirm ACK:message = " + message);
            } catch (TimeoutException e) {
                System.out.println("Confirm 超时:message = " + message);
                throw e;
            } catch (IOException e) {
                System.out.println("Confirm NACK/通道异常:message = " + message);
                throw e;
            }
        }
    }
}

批量 Publisher Confirms

public class BatchPublisherConfirmsDemo {

    private static final String EXCHANGE = "ex.publisher.confirms";
    private static final String QUEUE = "q.publisher.confirms";
    private static final String ROUTING_KEY = "q.publisher.confirms";

    public static void publishBatch(ConnectionFactory factory) throws Exception {
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.confirmSelect();

            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, false, false, null);
            channel.queueDeclare(QUEUE, false, false, false, null);
            channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);

            final int batchSize = 10;           // 每批确认多少条
            final int totalMessages = 102;      // 总消息数

            int outstanding = 0;                // 当前批次已发送但未确认的数量

            for (int i = 0; i < totalMessages; i++) {
                String body = "hello-" + i;

                channel.basicPublish(
                        EXCHANGE,
                        ROUTING_KEY,
                        null,
                        body.getBytes(StandardCharsets.UTF_8)
                );

                outstanding++;

                // 满一批就阻塞等待 broker 确认
                if (outstanding == batchSize) {
                    channel.waitForConfirmsOrDie(5_000);
                    System.out.println("批消息确认:size=" + batchSize);
                    outstanding = 0;
                }
            }

            // 处理最后不足一批的尾巴
            if (outstanding > 0) {
                channel.waitForConfirmsOrDie(5_000);
                System.out.println("批消息确认:tailSize=" + outstanding);
            }
        }
    }
}

异步 Publisher Confirms

public class AsyncPublisherConfirmsDemo {

    private static final String EXCHANGE = "ex.publisher.confirms";
    private static final String QUEUE = "q.publisher.confirms";
    private static final String ROUTING_KEY = "q.publisher.confirms";

    public static void publishAsync(ConnectionFactory factory) throws Exception {
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.confirmSelect();

            channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, false, false, null);
            channel.queueDeclare(QUEUE, false, false, false, null);
            channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);

            // 保存"未确认消息":seqNo -> payload
            ConcurrentNavigableMap<Long, String> outstanding = new ConcurrentSkipListMap<>();

            // ACK 回调:清理已确认的消息
            ConfirmCallback ackCallback = (seqNo, multiple) -> {
                if (multiple) {
                    ConcurrentNavigableMap<Long, String> head = outstanding.headMap(seqNo, true);
                    head.clear();
                } else {
                    outstanding.remove(seqNo);
                }
            };

            // NACK 回调:清理并记录失败的消息
            ConfirmCallback nackCallback = (seqNo, multiple) -> {
                if (multiple) {
                    ConcurrentNavigableMap<Long, String> head = outstanding.headMap(seqNo, true);
                    head.forEach((k, v) -> System.err.println("NACK: seqNo=" + k + " msg=" + v));
                    head.clear();
                } else {
                    String msg = outstanding.remove(seqNo);
                    System.err.println("NACK: seqNo=" + seqNo + " msg=" + msg);
                }
            };

            // 注册 confirm 监听器(异步)
            channel.addConfirmListener(ackCallback, nackCallback);

            // 高速发送:先拿到 seqNo,再 publish,再 put 到 outstanding
            String prefix = "hello-";
            int total = 1000;

            for (int i = 0; i < total; i++) {
                String payload = prefix + i;

                long seqNo = channel.getNextPublishSeqNo();

                channel.basicPublish(
                        EXCHANGE,
                        ROUTING_KEY,
                        null,
                        payload.getBytes(StandardCharsets.UTF_8)
                );

                outstanding.put(seqNo, payload);
            }
        }
    }
}

持久化机制

持久化是提高RabbitMQ可靠性的基础,主要从以下几个方面来保障消息的持久性:

  1. Exchange的持久化: 通过定义时设置durable参数为true
  2. Queue的持久化: 通过定义设置durable参数为true
  3. 消息的持久化: 通过将消息投递模式(BasicProperties 中的 deliveryMode 属性)配置为2

错误速查表

症状根因修复
”发送成功”,但队列里没有消息且无报错未启用 mandatory,路由不到队列被静默丢弃basicPublish mandatory=true + ReturnListener
waitForConfirmsOrDie 报 TimeoutExceptionBroker 压力大/网络抖动/磁盘 IO 慢导致确认迟到增大 timeout;改异步 Confirm
waitForConfirmsOrDie 报 IOExceptionBroker 返回 Nack 或 channel 被关闭对 Nack 做重试/落库补偿
批量 Confirm 失败后无法确认哪条消息失败批量确认只给”批次结果”改异步 Confirm
Broker 重启/宕机后消息丢失Exchange/Queue 未 durable 或消息未持久化Exchange durable=true、Queue durable=true、消息 deliveryMode=2
消费者重复消费/余额被加两次重投/至少一次投递语义下未做幂等以业务唯一键做幂等(DB 唯一约束/去重表/状态机)

总结

  • Confirm 机制: 解决”Broker 收到消息”的问题
  • mandatory + Return: 解决”路由失败可见”的问题
  • 持久化: 解决”宕机丢消息”的问题
  • 幂等: 解决”重投/重复消费”的问题