TL;DR

  • 场景: RabbitMQ fanout 广播发布订阅,Java 生产者/消费者完整跑通,并解释推/拉消费差异
  • 结论: 默认推模式(basicConsume)更适合常规实时消费;拉模式(basicGet)用于条件/批量/限速场景
  • 产出: 可直接复用的 EmitLog/ReceiveLogs 代码 + rabbitmqctl 绑定验证方法 + 常见故障速查

消息推拉模式详解

推模式(Push)实现方式

1. 继承 DefaultConsumer 基类

这是 RabbitMQ Java 客户端提供的标准实现方式,需要重写 handleDelivery 方法来处理接收到的消息。

Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                             AMQP.BasicProperties properties, byte[] body) {
        String message = new String(body, "UTF-8");
        // 处理消息逻辑
    }
};
channel.basicConsume(queueName, true, consumer);

2. 使用 Spring AMQP 的 SimpleMessageListenerContainer

这是 Spring 框架提供的更高级的封装,支持自动声明队列、交换机和绑定,支持消息转换、错误处理等高级功能。

@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setQueueNames("queueName");
    container.setMessageListener(messageListenerAdapter);
    return container;
}

推模式的适用场景

推模式是最常用的消费模式,特别适合:

  • 实时性要求高的场景
  • 消费者处理能力稳定的情况
  • 需要持续处理消息的场景

拉模式(Pull)的必要性

在以下场景中,推模式可能不适用,需要采用拉模式:

  1. 条件性消费: 消费者只能在特定条件满足时才能处理消息
  2. 批量处理需求: 需要一次性拉取多条消息进行批量处理
GetResponse response = channel.basicGet(queueName, false);
if (response != null) {
    // 处理消息
    channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
}
  1. 资源受限场景: 消费者处理能力有限,需要控制消息拉取速率
  2. 特殊业务需求: 需要精确控制消息拉取时机

推拉模式的选择建议

  1. 默认选择推模式: 实现简单、性能较好、适合大多数常规场景
  2. 考虑拉模式的情况: 需要精确控制消息消费时机、处理批量消息、消费者资源受限、实现特殊业务逻辑
  3. 混合模式: 大部分时间使用推模式,特定条件下切换到拉模式

EmitLog 代码

package icu.wzk.demo;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            String message = (args.length < 1)
                    ? "info: Hello World!"
                    : String.join(" ", args);

            channel.basicPublish(
                    EXCHANGE_NAME,
                    "",
                    null,
                    message.getBytes(StandardCharsets.UTF_8)
            );

            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

ReceiveLogs 代码

package icu.wzk.demo;

public class ReceiveLogs {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String queueName = channel.queueDeclare().getQueue();

        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };

        boolean autoAck = true;
        channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {});
    }
}

启动测试

查看绑定关系:

rabbitmqctl list_bindings

格式化输出:

rabbitmqctl list_bindings --formatter pretty_table

错误速查

症状根因定位修复
消费者一直收不到消息exchange 名称不一致对比生产者/消费者 EXCHANGE_NAME;rabbitmqctl list_exchanges
发送时报 PRECONDITION_FAILEDexchange 已存在但类型不同保持 exchangeDeclare 参数一致;删除旧 exchange 重建
启动消费者后能收,重启 RabbitMQ 后全没了使用临时队列且非持久化生产环境改为声明具名持久化队列
生产者显示 Sent,但队列无消息没有任何队列绑定到 fanout exchange先启动消费者完成 queueBind
消费端偶发丢消息autoAck=true,回调未处理完进程崩溃改为手动 ack:成功 basicAck,失败 basicNack/requeue
消费变慢后内存/堆积明显推模式未限流,prefetch 未设置设置 basicQos(prefetch),控制并发与消费速率
basicGet 经常返回 null队列为空或被其他消费者抢占确保队列独占或设计调度策略