TL;DR
- 场景: 支付/充值等需要最终一致性的链路,用 RabbitMQ 做异步解耦但必须可追责不丢单。
- 结论: Confirm 解决”Broker 收到”,mandatory+Return 解决”路由失败可见”,持久化+幂等兜底”宕机/重投/重复”。
消息可靠性概述
一般我们使用支付宝或者微信转账的时候,都是扫码支付,然后立刻得到结果。支付平台必须保证数据正确性,保证数据并发安全性,保证数据最终一致性。
分布式锁
- 优点: 能够保证数据的强一致性。
- 缺点: 高并发场景下可能有性能问题。
消息队列
- 优点: 异步、高并发
- 缺点: 有一定延时,数据弱一致性,并且必须能够保证该业务操作肯定能够成功,不能失败。
如何保证消息可靠性
- 客户端代码中的异常捕获,包括生产者和消费者
- AMQP、RabbitMQ 的事务机制
- 发送端确认机制
- 消息持久化机制
- Broker端的高可用集群
- 消费者确认机制
- 消费端限流
- 消息幂等性
发送确认机制
RabbitMQ 引入了一种轻量级的方式,叫发送方确认(publisher confirm)机制。生产者将信道设置为confirm模式,一旦信道进入confirm模式,所有在该信道上发布的消息都会被拍成一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,RabbitMQ会发送一个确认Basic.Ack给生产者,包含消息的唯一ID。
同步 Publisher Confirms 示例
public class PublisherConfirmsDemo {
private static final String EXCHANGE = "ex.publisher.confirms";
private static final String QUEUE = "q.publisher.confirms";
private static final String ROUTING_KEY = QUEUE;
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("node1");
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("123456");
factory.setPort(5672);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 1) 开启 publisher confirms(必须在 publish 前调用)
channel.confirmSelect();
// 2) 声明拓扑
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, false, false, null);
channel.queueDeclare(QUEUE, false, false, false, null);
channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
// 3) 开启 mandatory:路由不到队列时 broker 会 Return 回来
channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> {
String returned = new String(body, StandardCharsets.UTF_8);
System.err.printf("Return: code=%d text=%s ex=%s rk=%s body=%s%n",
replyCode, replyText, exchange, routingKey, returned);
});
String message = "hello";
// 4) 发布消息(mandatory=true:保证路由失败能被感知)
channel.basicPublish(
EXCHANGE,
ROUTING_KEY,
true, // mandatory
null, // props: 可在这里设置 deliveryMode(持久化)、headers 等
message.getBytes(StandardCharsets.UTF_8)
);
// 5) 同步等待确认
try {
channel.waitForConfirmsOrDie(5_000);
System.out.println("Confirm ACK:message = " + message);
} catch (TimeoutException e) {
System.out.println("Confirm 超时:message = " + message);
throw e;
} catch (IOException e) {
System.out.println("Confirm NACK/通道异常:message = " + message);
throw e;
}
}
}
}
批量 Publisher Confirms
public class BatchPublisherConfirmsDemo {
private static final String EXCHANGE = "ex.publisher.confirms";
private static final String QUEUE = "q.publisher.confirms";
private static final String ROUTING_KEY = "q.publisher.confirms";
public static void publishBatch(ConnectionFactory factory) throws Exception {
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.confirmSelect();
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, false, false, null);
channel.queueDeclare(QUEUE, false, false, false, null);
channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
final int batchSize = 10; // 每批确认多少条
final int totalMessages = 102; // 总消息数
int outstanding = 0; // 当前批次已发送但未确认的数量
for (int i = 0; i < totalMessages; i++) {
String body = "hello-" + i;
channel.basicPublish(
EXCHANGE,
ROUTING_KEY,
null,
body.getBytes(StandardCharsets.UTF_8)
);
outstanding++;
// 满一批就阻塞等待 broker 确认
if (outstanding == batchSize) {
channel.waitForConfirmsOrDie(5_000);
System.out.println("批消息确认:size=" + batchSize);
outstanding = 0;
}
}
// 处理最后不足一批的尾巴
if (outstanding > 0) {
channel.waitForConfirmsOrDie(5_000);
System.out.println("批消息确认:tailSize=" + outstanding);
}
}
}
}
异步 Publisher Confirms
public class AsyncPublisherConfirmsDemo {
private static final String EXCHANGE = "ex.publisher.confirms";
private static final String QUEUE = "q.publisher.confirms";
private static final String ROUTING_KEY = "q.publisher.confirms";
public static void publishAsync(ConnectionFactory factory) throws Exception {
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.confirmSelect();
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT, false, false, null);
channel.queueDeclare(QUEUE, false, false, false, null);
channel.queueBind(QUEUE, EXCHANGE, ROUTING_KEY);
// 保存"未确认消息":seqNo -> payload
ConcurrentNavigableMap<Long, String> outstanding = new ConcurrentSkipListMap<>();
// ACK 回调:清理已确认的消息
ConfirmCallback ackCallback = (seqNo, multiple) -> {
if (multiple) {
ConcurrentNavigableMap<Long, String> head = outstanding.headMap(seqNo, true);
head.clear();
} else {
outstanding.remove(seqNo);
}
};
// NACK 回调:清理并记录失败的消息
ConfirmCallback nackCallback = (seqNo, multiple) -> {
if (multiple) {
ConcurrentNavigableMap<Long, String> head = outstanding.headMap(seqNo, true);
head.forEach((k, v) -> System.err.println("NACK: seqNo=" + k + " msg=" + v));
head.clear();
} else {
String msg = outstanding.remove(seqNo);
System.err.println("NACK: seqNo=" + seqNo + " msg=" + msg);
}
};
// 注册 confirm 监听器(异步)
channel.addConfirmListener(ackCallback, nackCallback);
// 高速发送:先拿到 seqNo,再 publish,再 put 到 outstanding
String prefix = "hello-";
int total = 1000;
for (int i = 0; i < total; i++) {
String payload = prefix + i;
long seqNo = channel.getNextPublishSeqNo();
channel.basicPublish(
EXCHANGE,
ROUTING_KEY,
null,
payload.getBytes(StandardCharsets.UTF_8)
);
outstanding.put(seqNo, payload);
}
}
}
}
持久化机制
持久化是提高RabbitMQ可靠性的基础,主要从以下几个方面来保障消息的持久性:
- Exchange的持久化: 通过定义时设置durable参数为true
- Queue的持久化: 通过定义设置durable参数为true
- 消息的持久化: 通过将消息投递模式(BasicProperties 中的 deliveryMode 属性)配置为2
错误速查表
| 症状 | 根因 | 修复 |
|---|---|---|
| ”发送成功”,但队列里没有消息且无报错 | 未启用 mandatory,路由不到队列被静默丢弃 | basicPublish mandatory=true + ReturnListener |
| waitForConfirmsOrDie 报 TimeoutException | Broker 压力大/网络抖动/磁盘 IO 慢导致确认迟到 | 增大 timeout;改异步 Confirm |
| waitForConfirmsOrDie 报 IOException | Broker 返回 Nack 或 channel 被关闭 | 对 Nack 做重试/落库补偿 |
| 批量 Confirm 失败后无法确认哪条消息失败 | 批量确认只给”批次结果” | 改异步 Confirm |
| Broker 重启/宕机后消息丢失 | Exchange/Queue 未 durable 或消息未持久化 | Exchange durable=true、Queue durable=true、消息 deliveryMode=2 |
| 消费者重复消费/余额被加两次 | 重投/至少一次投递语义下未做幂等 | 以业务唯一键做幂等(DB 唯一约束/去重表/状态机) |
总结
- Confirm 机制: 解决”Broker 收到消息”的问题
- mandatory + Return: 解决”路由失败可见”的问题
- 持久化: 解决”宕机丢消息”的问题
- 幂等: 解决”重投/重复消费”的问题