TL;DR

  • 场景: 按日志级别/业务类型分流消息,避免 fanout 广播造成无效消费
  • 结论: direct 只做”精确匹配”,路由取决于 routingKey==bindingKey;一个队列可绑定多个 key
  • 产出: direct_logs 示例(Producer/Consumer)、多绑定分流模型、常见踩坑速查卡

RabbitMQ 路由模式

使用 direct 类型的 Exchange 实现消息选择性消费的具体步骤如下:

1. Exchange 声明与配置

  • 声明一个 direct 类型的 Exchange
  • 该 Exchange 会严格根据 RoutingKey 进行消息路由
  • 只有 RoutingKey 完全匹配的队列才能收到消息

2. 生产者发送消息

  • 生产者发送 N 条消息时
  • 每条消息需要指定不同的 RoutingKey
  • 例如:
    • “error.log” 用于错误日志
    • “info.log” 用于信息日志
    • “warning.log” 用于警告日志

3. 消费者绑定配置

  • 每个消费者需要:
    • 声明自己的队列
    • 将队列与 Exchange 绑定
    • 指定具体的 RoutingKey
  • 示例绑定:
    • 队列A绑定 RoutingKey=“error.log”(只接收错误日志)
    • 队列B绑定 RoutingKey=“*.log”(接收所有日志)

4. 实际应用场景

  • 日志系统实现
    • 错误日志消费者:绑定”error.log”,只接收错误消息写入文件
    • 控制台消费者:绑定”#.log”,接收所有级别日志打印到控制台
  • 订单系统
    • 支付队列:绑定”order.payment”
    • 发货队列:绑定”order.shipment”
    • 每个队列只处理特定业务的消息

5. 与 fanout 模式的区别

  • fanout: 广播模式,无视 RoutingKey
  • direct: 精确匹配 RoutingKey 的路由模式
  • 选择依据
    • 需要广播时用 fanout
    • 需要选择性消费时用 direct

6. 高级用法

  • 多绑定: 一个队列可以绑定多个 RoutingKey
  • 组合使用: 可以同时使用多个 Exchange 实现复杂路由
  • 优先级: 可以为不同 RoutingKey 的消息设置优先级

这种模式特别适合需要根据消息类型进行差异化处理的场景,能够有效实现消息的分类处理和精准投递。


绑定队列

上个模式中,交换器的使用方式:

channel.queueBind(queueName, EXCHANGE_NAME, "");

绑定语句中还有第三个参数:routingKey

channel.queueBind(queueName, EXCHANGE_NAME, "black");

bindingKey的作用与具体使用的交换器类型有关,对于 fanout 类型的交换器,此参数设置无效,系统直接忽略。


direct交换器

分布式系统中有很多应用,这些应用需要运维平台的监控,其中一个重要的信息就是服务器的日志记录。 我们需要将不同日志级别的日志记录交给不同的应用处理,这种情况下我们可以使用 direct交换器。

如果要对不同的消息做不同的处理,此时不能使用fanout类型的交换器,因为它只会盲目的广播消息。我们需要使用direct类型的交换器,direct交换器的路由算法很简单,只要消息的 RoutingKey 和 队列的 BindingKey 对应,消息就可以推送给该队列。

Direct Exchange 工作原理

上图中的交换器X是direct类型的交换器,绑定的两个队列中,一个队列的 bindingKey是orange,另一个队列的bindingKey是black和green。

  • routingKey 是 orange 的消息发送给队列Q1
  • routingKey是black和green的消息发送给Q2队列
  • 其他消息丢弃

多重绑定

也可以多重绑定:两个队列同时绑定相同的 bindingKey,此时行为与 fanout 类似,也是广播。

在案例中,我们将日志级别作为:routingKey。


EmitLogsDirect (生产者代码)

package icu.wzk.demo;
/**
 * EmitLogsDirect:发布端(Producer)
 *
 * 目标:
 * - 声明 direct 类型交换机 direct_logs
 * - 使用 routingKey=severity(info/warn/error)发布日志
 *
 * direct 语义:
 * - routingKey 会参与路由匹配
 * - 队列需要用 bindingKey 绑定到 exchange
 * - 消息的 routingKey 与 bindingKey 完全匹配时,该队列才会收到
 *
 * 示例效果:
 * - routingKey="error" 的消息,只会进入绑定了 bindingKey="error" 的队列
 * - 一个队列也可以同时绑定多个 key(比如绑定 warn 和 error)
 */
public class EmitLogsDirect {

    // 交换机名称:生产者和消费者必须一致
    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1) 配置连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        // RabbitMQ 连接参数
        factory.setHost("localhost");
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("secret");
        factory.setPort(5672);

        // 2) 建立连接与 Channel
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            /**
             * 3) 声明 direct 交换机
             * - 若不存在则创建
             * - 若存在则校验类型/参数一致性
             */
            channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

            /**
             * 4) 循环发送 100 条日志消息,severity 在 info/warn/error 之间轮转
             * routingKey=severity,是 direct 路由的关键
             */
            for (int i = 0; i < 100; i++) {
                String severity = pickSeverity(i);

                // 消息体:业务内容
                String logStr = "这是 【" + severity + "】 的消息";

                /**
                 * 5) 发布消息
                 * basicPublish(exchange, routingKey, props, body)
                 * - exchange: direct_logs
                 * - routingKey: severity(info/warn/error)
                 * - props: null(未附加消息属性)
                 * - body: UTF-8 编码后的字节数组
                 */
                channel.basicPublish(
                        EXCHANGE_NAME,
                        severity,
                        null,
                        logStr.getBytes(StandardCharsets.UTF_8)
                );

                System.out.println(logStr);
            }
        }
    }

    /**
     * 根据 i 的值选择 severity。
     * i%3 的结果只可能是 0/1/2。
     */
    private static String pickSeverity(int i) {
        int mod = i % 3;
        if (mod == 0) return "info";
        if (mod == 1) return "warn";
        return "error";
    }
}

ReceiveErrorLogsDirect (消费者 - 只接收 error)

package icu.wzk.demo;

public class ReceiveErrorLogsDirect {
    // 与生产者保持一致的配置
    private static final String HOST = "localhost";
    private static final int PORT = 5672;
    private static final String VHOST = "/";
    private static final String USERNAME = "admin";
    private static final String PASSWORD = "secret";

    private static final String EXCHANGE_NAME = "direct_logs";
    private static final String KEY_ERROR = "error";

    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setVirtualHost(VHOST);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        // 声明 direct 交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        // 声明临时队列
        String queueName = channel.queueDeclare().getQueue();

        // 绑定队列到交换机(只接收 error)
        channel.queueBind(queueName, EXCHANGE_NAME, KEY_ERROR);

        // 消息回调
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String routingKey = delivery.getEnvelope().getRoutingKey();
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        };

        // 开始消费(autoAck=true)
        boolean autoAck = true;
        channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {});
    }
}

ReceiveWarnInfoLogsDirect (消费者 - 接收 warn + info)

/**
 * ReceiveWarnInfoLogsDirect:消费端(Consumer)
 *
 * 目标:
 * - 订阅 direct 类型交换机 direct_logs
 * - 只接收 routingKey = "warn" 或 "info" 的消息
 *
 * direct 语义(关键):
 * - Producer 发布消息时带 routingKey(severity:info/warn/error)
 * - Consumer 通过 queueBind(...) 指定 bindingKey
 * - routingKey 与 bindingKey 完全匹配时,该消息才会路由到该队列
 *
 * 本类绑定了两个 key:
 * - queueBind(..., "warn")
 * - queueBind(..., "info")
 * 因此同一个队列会收到两类 routingKey 的消息。
 */

public class ReceiveWarnInfoLogsDirect {

    private static final String HOST = "node1";
    private static final int PORT = 5672;
    private static final String VHOST = "/";
    private static final String USERNAME = "root";
    private static final String PASSWORD = "123456";

    private static final String EXCHANGE_NAME = "direct_logs";

    // 本消费者关心的 severities
    private static final String KEY_WARN = "warn";
    private static final String KEY_INFO = "info";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1) 配置连接参数
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(HOST);
        factory.setPort(PORT);
        factory.setVirtualHost(VHOST);
        factory.setUsername(USERNAME);
        factory.setPassword(PASSWORD);

        // 2) 建立连接与 Channel(消费者通常常驻,不主动 close)
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        /**
         * 3) 声明 direct 交换机
         * 与生产者一致:direct_logs + DIRECT
         */
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        /**
         * 4) 声明临时队列(匿名队列)
         * - 由 broker 分配随机队列名
         * - exclusive + autoDelete,适合 demo/临时订阅
         */
        String queueName = channel.queueDeclare().getQueue();

        /**
         * 5) 绑定队列到交换机(绑定多个 bindingKey)
         * 一个队列可以绑定同一 exchange 多次,每次一个 key:
         * - 收 warn
         * - 收 info
         *
         * 结果:该队列会接收两类 routingKey 的消息。
         */
        channel.queueBind(queueName, EXCHANGE_NAME, KEY_WARN);
        channel.queueBind(queueName, EXCHANGE_NAME, KEY_INFO);

        /**
         * 6) 消息回调
         * - routingKey:打印出这条消息属于哪一类(warn/info)
         * - body:UTF-8 解码
         */
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String routingKey = delivery.getEnvelope().getRoutingKey();
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
        };

        /**
         * 7) 开始消费
         * autoAck=true:消息投递即确认,回调异常/进程崩溃会导致消息丢失风险。
         */
        boolean autoAck = true;
        channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {});

        // 进程保持运行以持续消费
    }
}

核心要点总结

特性说明
路由方式Direct Exchange - 精确匹配
匹配规则routingKey == bindingKey(完全相等)
多绑定一个队列可绑定多个 bindingKey
广播行为多个队列绑定相同 key 时,类似 fanout 广播
适用场景日志分级、订单分流、消息分类处理