TL;DR
- 场景: 用 Java 并发实现一个最小可运行的「队列缓冲 + 异步解耦」消息模型
- 结论: BlockingQueue 可以解释 MQ 的核心交互,但缺少持久化、ACK、重试、集群、可观测性
- 产出: 一个可运行的 Producer/Consumer Demo + 企业级差距列表
版本矩阵
| Project | Status | Description |
|---|---|---|
| JDK Version | Unverified | BlockingQueue/ArrayBlockingQueue logic consistent across JDK 8/11/17 |
| Queue Implementation | Verified | ArrayBlockingQueue<>(20) clearly defined |
消息中间件 - 基本概念
消息中间件(Message-Oriented Middleware,简称MOM),是分布式系统架构中实现异步通信的核心组件。
典型特征:
- 异步通信机制: 采用”发送后不管”(fire-and-forget)模式
- 消息持久化: 通过磁盘存储或复制机制确保消息不丢失
- 协议支持: 通常支持AMQP、MQTT、STOMP等标准协议
- 消息路由: 提供灵活的路由策略
主流实现对比:
| System | Throughput | Latency | Persistence |
|---|---|---|---|
| RabbitMQ | Medium | Low | Supported |
| Kafka | High | Medium | Strong support |
| RocketMQ | High | Low | Supported |
自定义消息中间件
Producer
public class WzkProducer implements Runnable {
private final BlockingQueue<Good> blockingQueue;
@Override
public void run() {
try {
while (true) {
Thread.sleep(2000);
if (blockingQueue.remainingCapacity() > 0) {
Good good = Good.builder()
.id(UUID.randomUUID().toString())
.type("吃的")
.build();
blockingQueue.add(good);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
Consumer
public class WzkConsumer implements Runnable {
private final BlockingQueue<Good> blockingQueue;
@Override
public void run() {
try {
while (true) {
Thread.sleep(1000);
Good good = blockingQueue.take();
System.out.println("吃了食物: " + good);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
Main
public class WzkMain {
public static void main(String[] args) throws Exception {
BlockingQueue<Good> blockingQueue = new ArrayBlockingQueue<>(20);
WzkProducer wzkProducer = new WzkProducer(blockingQueue);
WzkConsumer wzkConsumer = new WzkConsumer(blockingQueue);
new Thread(wzkProducer).start();
Thread.sleep(10_000);
new Thread(wzkConsumer).start();
}
}
生产环境差距
需要重点考虑的问题:
-
消息持久化: 需要支持消息持久化存储,防止系统崩溃时消息丢失
-
消息可靠性保证:
- 发送确认机制:实现ACK/NACK机制
- 消费确认机制:消费者处理完成后需显式确认
-
高并发处理: 需要支持水平扩展的集群架构
-
系统可靠性:
- 故障转移机制:主从切换、自动恢复
- 多机房部署:异地多活容灾方案
- 监控告警:完善的健康检查和监控体系
-
流量控制:
- 限流机制:令牌桶或漏桶算法
- 熔断降级:防止系统过载
错误速查
| 症状 | 根因 | 修复 |
|---|---|---|
| 生产者在队列满时抛异常或线程退出 | 使用 blockingQueue.add() 队列满会抛异常 | 改为 put() 或 offer(timeout) |
消费者 take() 阻塞看似”卡死” | 队列为空时 take() 必然阻塞 | 如需可停机:用 poll(timeout) |
| 无法优雅停机 | while(true) 无限循环 | 用 while(!Thread.currentThread().isInterrupted()) |
| 数据可靠性为0 | 内存队列无持久化、无副本、无重放 | 需要 WAL 机制 |
| 重复消费/消息丢失不可控 | 无ACK、无重试、无幂等、无死信 | 引入 ACK + 重试 + DLQ 设计 |