TL;DR
- 场景:同一队列多消费者分摊任务 + 一条消息广播给多个订阅方
- 结论:Work Queue 依赖 manual ack + basicQos 控制分发;fanout 依赖交换器绑定与临时队列实现”一对多”
- 产出:给出 Java 生产/消费代码骨架、未命名交换器用法、临时队列与 binding 的验证路径
RabbitMQ 工作模式
Work Queue
生产者发送消息,启动多个消费者实例来消费消息,每个消费者仅消费部分信息,可以达到负载均衡的效果。
NewTask
package icu.wzk.demo;
public class TestTask {
private static final String HOST = "localhost";
private static final String VIRTUAL_HOST = "/";
private static final String USERNAME = "admin";
private static final String PASSWORD = "secret";
private static final int PORT = 5672;
private static final String QUEUE_NAME = "wzk-icu";
private static final String[] WORKS = {
"hello.",
"hello..",
"hello...",
"hello....",
"hello.....",
"hello......",
"hello........",
"hello.........",
"hello..........",
"hello..........."
};
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setVirtualHost(VIRTUAL_HOST);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
factory.setPort(PORT);
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String exchange = "";
for (String work : WORKS) {
channel.basicPublish(
exchange,
QUEUE_NAME,
null,
work.getBytes(StandardCharsets.UTF_8)
);
System.out.println(" [x] Sent '" + work + "'");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
TestTask2
package icu.wzk.demo;
public class TestTask2 {
private static final String TASK_QUEUE_NAME = "wzk-icu";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("secret");
// false = 手动 ack(推荐配合 basicQos)
boolean autoAck = false;
try (Connection conn = factory.newConnection(); Channel channel = conn.createChannel()) {
// 每次只拉取 1 条,避免一次性堆给消费者
channel.basicQos(1);
channel.queueDeclare(TASK_QUEUE_NAME, false, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String task = new String(delivery.getBody(), StandardCharsets.UTF_8);
long tag = delivery.getEnvelope().getDeliveryTag();
System.out.println(" [x] Received '" + task + "'");
try {
doWork(task);
System.out.println(" [x] Done");
// 手动确认
channel.basicAck(tag, false);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 中断时重回队列
channel.basicNack(tag, false, true);
} catch (Exception e) {
// 失败时重回队列(按需改为 false 丢弃)
channel.basicNack(tag, false, true);
}
};
channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {});
// 阻塞主线程,保持进程存活
synchronized (TestTask2.class) {
try {
TestTask2.class.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
private static void doWork(String task) throws InterruptedException {
System.out.println("task = " + task);
for (char ch : task.toCharArray()) {
if (ch == '.') {
Thread.sleep(1000);
}
}
}
}
发布订阅
在RabbitMQ消息队列系统中,fanout类型交换器是一种重要的消息分发机制。这种交换器的工作方式类似于广播模式,具有以下特点:
-
消息分发机制:
- 生产者将消息发送到Exchange时,会完全忽略RoutingKey(路由键)的设置
- Exchange会将收到的每一条消息复制并推送到所有绑定到该Exchange的队列
- 每个消费者队列都会收到完整的消息副本
-
队列绑定过程:
- 每个消费者在订阅时,RabbitMQ会自动为其创建一个专属队列
- 这个新创建的队列会与指定的Exchange建立绑定关系
- 绑定过程不涉及任何路由规则或过滤条件
-
典型应用场景:
- 实时通知系统(如股票行情推送)
- 日志收集系统(多个日志处理服务同时接收日志)
- 事件广播系统(订单创建事件通知多个微服务)
发布订阅使用 fanout,创建交换器叫做 logs:
channel.exchangeDeclare("logs", "fanout");
fanout 交换器很简单,从名字可以看到叫用风扇吹出去,将收到的消息发送给它知道的所有队列。
未命名交换器
在前面我们虽然没有指定交换器,但是依然可以向队列发送消息,这是因为我们使用了默认的交换器。
channel.basicPublish("", "hello", null, message.getBytes());
第一个参数就是交换器的名称,为空字符串,直接使用 RoutingKey 向队列发送消息。
如果我们要向指定的交换发布器发送消息:
channel.basicPublish("logs", "", null, message.getBytes());
临时队列
生产者和消费者都是通过队列名称来发送和接收该队列中的消息。在使用RabbitMQ时,创建队列的过程需要注意以下几点:
-
队列创建机制:
- 当连接到RabbitMQ时,通常需要创建一个新的空队列
- 队列命名方式有两种选择:
- 自定义命名:可以指定一个有意义的队列名称
- 自动生成:可以让RabbitMQ服务器生成随机队列名
-
队列生命周期管理:
- 临时队列特性:当声明队列时将”exclusive”参数设为true,该队列就变成独占队列
- 自动删除机制:对于独占队列,一旦消费者断开连接,RabbitMQ会自动删除该队列
- 持久化队列:如果需要长期保留队列,可以设置”durable”参数为true
String queueName = channel.queueDeclare().getQueue();
上述代码我们声明了一个非持久化的、排他的、自动删除的队列,并且名字都是服务器随机生成的。
进行绑定
在创建了消息队列和fanout类型的交换器之后,我们需要将两者进行绑定,让交换器将消息发送给该队列。fanout交换器会将收到的所有消息广播到所有与之绑定的队列中。
channel.queueBind(queueName, "logs", "");
这段代码中各个参数的含义是:
queueName:要绑定的队列名称logs:交换器的名称"":路由键(routing key),在fanout类型中不需要使用,因此为空字符串
错误速查
| 症状 | 根因定位 | 修复 |
|---|---|---|
| 多消费者但分发不均、某个消费者”吃撑” | 未设置 basicQos 或 prefetch 过大 | 看消费者侧吞吐与未确认数(unacked)是否偏高 channel.basicQos(1..N);manual ack 后再调优 N |
| 消费者异常后消息丢失 | autoAck=true 或提前 ack | 查 basicConsume 的 autoAck 参数、日志中 ack 时机 设 autoAck=false;仅在业务完成后 basicAck |
| 失败消息无限重试、队列”打转” | basicNack(..., requeue=true) 对不可恢复错误也重回队列 | 观察同一消息反复出现,且无退避 区分可恢复/不可恢复:不可恢复 requeue=false 或走 DLX/重试队列 |
| 重启后队列/消息消失 | 队列非持久化 + 消息非持久化 | 看 queueDeclare(durable=false)、发布属性是否为 persistent 生产:durable queue + persistent message |
| 发布订阅消费者断开后队列没了 | 使用临时队列 exclusive/auto-delete 的预期行为 | 需要持久订阅就改为命名队列 + durable;临时订阅保持现状 |
| fanout 下设置 routingKey 但无效果 | fanout 忽略 routingKey | 检查 exchange_type=fanout 需要按 key 路由就改 direct/topic;fanout 保持 routingKey 为空即可 |
| 发送到 exchange 后消费者收不到 | 未 queueBind 或 bind 到错误 exchange | rabbitmqctl list_bindings 看是否存在绑定 确认 queueBind(queueName,"logs","") 与 exchange 名一致 |
| 使用默认交换器发送失败(无路由) | routingKey(队列名)不存在或拼错 | 生产者 basicPublish 的 routingKey 与实际队列名比对 先确保 queueDeclare 创建目标队列 |
| 消费端进程退出导致不再消费 | try-with-resources 结束或主线程未阻塞 | 保持进程存活;或用更标准的生命周期管理 |
| 连接失败/权限错误 | vhost/用户名密码/权限不匹配 | RabbitMQ 日志与连接异常栈 校验 vhost 存在、用户对 vhost 的 configure/write/read 权限 |