TL;DR
- 场景: 需要按”多维标签”路由消息(如来源+级别/区域+类型)
- 结论: Topic 通过 routingKey(点分词)与 bindingKey(*/#)模式匹配实现精细分发,不匹配会直接丢弃
RabbitMQ 主题模式
使用 topic 类型的交换器时,队列通过 bindingKey 绑定到交换器,其中 bindingKey 可以包含通配符进行灵活匹配。交换器在路由消息时,会将消息的 routingKey 与队列的 bindingKey 进行模式匹配,从而实现更细粒度的消息分发。
通配符规则
*(星号):匹配一个单词#(井号):匹配零个或多个单词
应用场景示例
假设有如下需求:
- 需要接收来自 cron 服务的所有 error 级别日志
- 需要接收来自 kern 服务的所有级别日志
- 需要接收所有服务的 critical 级别日志
对应的绑定关系:
- 队列1绑定键:
cron.error - 队列2绑定键:
kern.* - 队列3绑定键:
*.critical
消息 routingKey 匹配结果:
cron.error→ 队列1kern.info→ 队列2auth.critical→ 队列3
RoutingKey 格式规范
-
格式要求:
- 必须采用点分单词的形式(dotted-word notation)
- 每个单词之间用点号(.)分隔
- 整个字符串长度不能超过 255 字节
-
命名示例:
- 金融领域:
stock.usd.nyse - 汽车行业:
nyse.vwm - 通用示例:
quick.orange.rabbit
- 金融领域:
绑定匹配规则详解
Q1 绑定到 *.orange.*
- 匹配:
quick.orange.fox、lazy.orange.elephant - 不匹配:
orange.quick.fox(第一个单词不是通配符)、quick.orange(缺少第三个单词)
Q2 绑定到 *.*.rabbit 和 lazy.#
*.*.rabbit:匹配quick.orange.rabbit、lazy.brown.rabbitlazy.#:匹配lazy、lazy.orange、lazy.orange.male.rabbit
不匹配时的处理
如果消息的 routingKey 无法匹配任何绑定(如 quick.brown.fox),则该消息会被直接丢弃,不会进入任何队列。
Topic 交换器的行为类比
- 与 Fanout 交换器的相似性: 如果 bindingKey 使用
#(如#或lazy.#),则交换器会广播所有消息到匹配的队列。 - 与 Direct 交换器的相似性: 如果 bindingKey 完全不使用
*或#(如orange.rabbit),则交换器会精确匹配 routingKey。
示例场景
| routingKey | 匹配的队列 |
|---|---|
quick.orange.rabbit | Q2 |
lazy.orange.elephant | Q1 和 Q2 |
orange.rabbit | 不匹配,丢弃 |
lazy.brown.rabbit | Q2 |
Java 代码示例
EmitLogTopic(生产者)
package icu.wzk;
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
private static final String[] SPEED = {"lazy", "quick", "normal"};
private static final String[] COLOR = {"black", "orange", "red", "yellow", "blue", "white", "pink"};
private static final String[] SPECIES = {"dog", "rabbit", "chicken", "horse", "bear", "cat"};
private static final Random RANDOM = new Random();
public static void main(String[] args) throws IOException, TimeoutException {
// 1) 配置连接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("secret");
factory.setPort(5672);
// 2) 建立连接与 Channel
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 3) 声明 topic 交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 4) 循环发送 10 条随机消息
for (int i = 0; i < 10; i++) {
String speed = pick(SPEED);
String color = pick(COLOR);
String species = pick(SPECIES);
String message = speed + "-" + color + "-" + species;
String routingKey = speed + "." + color + "." + species;
channel.basicPublish(
EXCHANGE_NAME,
routingKey,
null,
message.getBytes(StandardCharsets.UTF_8)
);
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
}
}
}
private static String pick(String[] arr) {
return arr[RANDOM.nextInt(arr.length)];
}
}
ReceiveLogsTopic(消费者)
package icu.wzk;
public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("secret");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
String bindingKey = "*.*.rabbit";
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
DeliverCallback callback = (consumerTag, delivery) -> {
String body = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(bindingKey + " 匹配到的消息:" + body);
};
boolean autoAck = true;
channel.basicConsume(queueName, autoAck, callback, consumerTag -> {});
}
}
错误速查
| 症状 | 根因 | 修复 |
|---|---|---|
| 生产者发送成功,消费者收不到 | routingKey 与 bindingKey 无匹配 | 统一 routingKey 命名规范 |
| 报 NOT_FOUND - no exchange | 交换器不存在或 vhost 不一致 | 先 exchangeDeclare |
| 报 PRECONDITION_FAILED | 同名 exchange 类型不同 | 换新 exchange 名 |
| 消费者重启后丢消息 | autoAck=true,处理前崩溃 | 设 autoAck=false,成功后 basicAck |
| 嵌入式通配符不生效 | 通配符必须是独立词 | 改为点分词结构 |
| 消费端临时队列消失 | exclusive/auto-delete | 显式声明固定队列名+durable |
注意事项
- 通配符必须作为独立的单词出现,不能嵌入单词中间
- 匹配是大小写敏感的
- 空字符串不能作为 routingKey
- 点号不能出现在开头或结尾