TL;DR

  • 场景: 需要按”多维标签”路由消息(如来源+级别/区域+类型)
  • 结论: Topic 通过 routingKey(点分词)与 bindingKey(*/#)模式匹配实现精细分发,不匹配会直接丢弃

RabbitMQ 主题模式

使用 topic 类型的交换器时,队列通过 bindingKey 绑定到交换器,其中 bindingKey 可以包含通配符进行灵活匹配。交换器在路由消息时,会将消息的 routingKey 与队列的 bindingKey 进行模式匹配,从而实现更细粒度的消息分发。

通配符规则

  1. *(星号):匹配一个单词
  2. #(井号):匹配零个或多个单词

应用场景示例

假设有如下需求:

  1. 需要接收来自 cron 服务的所有 error 级别日志
  2. 需要接收来自 kern 服务的所有级别日志
  3. 需要接收所有服务的 critical 级别日志

对应的绑定关系:

  • 队列1绑定键:cron.error
  • 队列2绑定键:kern.*
  • 队列3绑定键:*.critical

消息 routingKey 匹配结果:

  • cron.error → 队列1
  • kern.info → 队列2
  • auth.critical → 队列3

RoutingKey 格式规范

  1. 格式要求:

    • 必须采用点分单词的形式(dotted-word notation)
    • 每个单词之间用点号(.)分隔
    • 整个字符串长度不能超过 255 字节
  2. 命名示例:

    • 金融领域:stock.usd.nyse
    • 汽车行业:nyse.vwm
    • 通用示例:quick.orange.rabbit

绑定匹配规则详解

Q1 绑定到 *.orange.*

  • 匹配:quick.orange.foxlazy.orange.elephant
  • 不匹配:orange.quick.fox(第一个单词不是通配符)、quick.orange(缺少第三个单词)

Q2 绑定到 *.*.rabbitlazy.#

  • *.*.rabbit:匹配 quick.orange.rabbitlazy.brown.rabbit
  • lazy.#:匹配 lazylazy.orangelazy.orange.male.rabbit

不匹配时的处理

如果消息的 routingKey 无法匹配任何绑定(如 quick.brown.fox),则该消息会被直接丢弃,不会进入任何队列。


Topic 交换器的行为类比

  • 与 Fanout 交换器的相似性: 如果 bindingKey 使用 #(如 #lazy.#),则交换器会广播所有消息到匹配的队列。
  • 与 Direct 交换器的相似性: 如果 bindingKey 完全不使用 *#(如 orange.rabbit),则交换器会精确匹配 routingKey。

示例场景

routingKey匹配的队列
quick.orange.rabbitQ2
lazy.orange.elephantQ1 和 Q2
orange.rabbit不匹配,丢弃
lazy.brown.rabbitQ2

Java 代码示例

EmitLogTopic(生产者)

package icu.wzk;

public class EmitLogTopic {
    private static final String EXCHANGE_NAME = "topic_logs";
    private static final String[] SPEED = {"lazy", "quick", "normal"};
    private static final String[] COLOR = {"black", "orange", "red", "yellow", "blue", "white", "pink"};
    private static final String[] SPECIES = {"dog", "rabbit", "chicken", "horse", "bear", "cat"};
    private static final Random RANDOM = new Random();

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1) 配置连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("secret");
        factory.setPort(5672);

        // 2) 建立连接与 Channel
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            // 3) 声明 topic 交换机
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

            // 4) 循环发送 10 条随机消息
            for (int i = 0; i < 10; i++) {
                String speed = pick(SPEED);
                String color = pick(COLOR);
                String species = pick(SPECIES);

                String message = speed + "-" + color + "-" + species;
                String routingKey = speed + "." + color + "." + species;

                channel.basicPublish(
                        EXCHANGE_NAME,
                        routingKey,
                        null,
                        message.getBytes(StandardCharsets.UTF_8)
                );

                System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
            }
        }
    }

    private static String pick(String[] arr) {
        return arr[RANDOM.nextInt(arr.length)];
    }
}

ReceiveLogsTopic(消费者)

package icu.wzk;

public class ReceiveLogsTopic {
    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("secret");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        String queueName = channel.queueDeclare().getQueue();

        String bindingKey = "*.*.rabbit";
        channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);

        DeliverCallback callback = (consumerTag, delivery) -> {
            String body = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(bindingKey + " 匹配到的消息:" + body);
        };

        boolean autoAck = true;
        channel.basicConsume(queueName, autoAck, callback, consumerTag -> {});
    }
}

错误速查

症状根因修复
生产者发送成功,消费者收不到routingKey 与 bindingKey 无匹配统一 routingKey 命名规范
报 NOT_FOUND - no exchange交换器不存在或 vhost 不一致先 exchangeDeclare
报 PRECONDITION_FAILED同名 exchange 类型不同换新 exchange 名
消费者重启后丢消息autoAck=true,处理前崩溃设 autoAck=false,成功后 basicAck
嵌入式通配符不生效通配符必须是独立词改为点分词结构
消费端临时队列消失exclusive/auto-delete显式声明固定队列名+durable

注意事项

  • 通配符必须作为独立的单词出现,不能嵌入单词中间
  • 匹配是大小写敏感的
  • 空字符串不能作为 routingKey
  • 点号不能出现在开头或结尾