TL;DR
- 场景: RabbitMQ fanout 广播发布订阅,Java 生产者/消费者完整跑通,并解释推/拉消费差异
- 结论: 默认推模式(basicConsume)更适合常规实时消费;拉模式(basicGet)用于条件/批量/限速场景
- 产出: 可直接复用的 EmitLog/ReceiveLogs 代码 + rabbitmqctl 绑定验证方法 + 常见故障速查
消息推拉模式详解
推模式(Push)实现方式
1. 继承 DefaultConsumer 基类
这是 RabbitMQ Java 客户端提供的标准实现方式,需要重写 handleDelivery 方法来处理接收到的消息。
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
String message = new String(body, "UTF-8");
// 处理消息逻辑
}
};
channel.basicConsume(queueName, true, consumer);
2. 使用 Spring AMQP 的 SimpleMessageListenerContainer
这是 Spring 框架提供的更高级的封装,支持自动声明队列、交换机和绑定,支持消息转换、错误处理等高级功能。
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueNames("queueName");
container.setMessageListener(messageListenerAdapter);
return container;
}
推模式的适用场景
推模式是最常用的消费模式,特别适合:
- 实时性要求高的场景
- 消费者处理能力稳定的情况
- 需要持续处理消息的场景
拉模式(Pull)的必要性
在以下场景中,推模式可能不适用,需要采用拉模式:
- 条件性消费: 消费者只能在特定条件满足时才能处理消息
- 批量处理需求: 需要一次性拉取多条消息进行批量处理
GetResponse response = channel.basicGet(queueName, false);
if (response != null) {
// 处理消息
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
}
- 资源受限场景: 消费者处理能力有限,需要控制消息拉取速率
- 特殊业务需求: 需要精确控制消息拉取时机
推拉模式的选择建议
- 默认选择推模式: 实现简单、性能较好、适合大多数常规场景
- 考虑拉模式的情况: 需要精确控制消息消费时机、处理批量消息、消费者资源受限、实现特殊业务逻辑
- 混合模式: 大部分时间使用推模式,特定条件下切换到拉模式
EmitLog 代码
package icu.wzk.demo;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = (args.length < 1)
? "info: Hello World!"
: String.join(" ", args);
channel.basicPublish(
EXCHANGE_NAME,
"",
null,
message.getBytes(StandardCharsets.UTF_8)
);
System.out.println(" [x] Sent '" + message + "'");
}
}
}
ReceiveLogs 代码
package icu.wzk.demo;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
boolean autoAck = true;
channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {});
}
}
启动测试
查看绑定关系:
rabbitmqctl list_bindings
格式化输出:
rabbitmqctl list_bindings --formatter pretty_table
错误速查
| 症状 | 根因定位 | 修复 |
|---|---|---|
| 消费者一直收不到消息 | exchange 名称不一致 | 对比生产者/消费者 EXCHANGE_NAME;rabbitmqctl list_exchanges |
| 发送时报 PRECONDITION_FAILED | exchange 已存在但类型不同 | 保持 exchangeDeclare 参数一致;删除旧 exchange 重建 |
| 启动消费者后能收,重启 RabbitMQ 后全没了 | 使用临时队列且非持久化 | 生产环境改为声明具名持久化队列 |
| 生产者显示 Sent,但队列无消息 | 没有任何队列绑定到 fanout exchange | 先启动消费者完成 queueBind |
| 消费端偶发丢消息 | autoAck=true,回调未处理完进程崩溃 | 改为手动 ack:成功 basicAck,失败 basicNack/requeue |
| 消费变慢后内存/堆积明显 | 推模式未限流,prefetch 未设置 | 设置 basicQos(prefetch),控制并发与消费速率 |
| basicGet 经常返回 null | 队列为空或被其他消费者抢占 | 确保队列独占或设计调度策略 |