TL;DR

  • 场景: 用 Java 并发实现一个最小可运行的「队列缓冲 + 异步解耦」消息模型
  • 结论: BlockingQueue 可以解释 MQ 的核心交互,但缺少持久化、ACK、重试、集群、可观测性
  • 产出: 一个可运行的 Producer/Consumer Demo + 企业级差距列表

版本矩阵

ProjectStatusDescription
JDK VersionUnverifiedBlockingQueue/ArrayBlockingQueue logic consistent across JDK 8/11/17
Queue ImplementationVerifiedArrayBlockingQueue<>(20) clearly defined

消息中间件 - 基本概念

消息中间件(Message-Oriented Middleware,简称MOM),是分布式系统架构中实现异步通信的核心组件。

典型特征:

  1. 异步通信机制: 采用”发送后不管”(fire-and-forget)模式
  2. 消息持久化: 通过磁盘存储或复制机制确保消息不丢失
  3. 协议支持: 通常支持AMQP、MQTT、STOMP等标准协议
  4. 消息路由: 提供灵活的路由策略

主流实现对比:

SystemThroughputLatencyPersistence
RabbitMQMediumLowSupported
KafkaHighMediumStrong support
RocketMQHighLowSupported

自定义消息中间件

Producer

public class WzkProducer implements Runnable {
    private final BlockingQueue<Good> blockingQueue;

    @Override
    public void run() {
        try {
            while (true) {
                Thread.sleep(2000);
                if (blockingQueue.remainingCapacity() > 0) {
                    Good good = Good.builder()
                            .id(UUID.randomUUID().toString())
                            .type("吃的")
                            .build();
                    blockingQueue.add(good);
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Consumer

public class WzkConsumer implements Runnable {
    private final BlockingQueue<Good> blockingQueue;

    @Override
    public void run() {
        try {
            while (true) {
                Thread.sleep(1000);
                Good good = blockingQueue.take();
                System.out.println("吃了食物: " + good);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

Main

public class WzkMain {
    public static void main(String[] args) throws Exception {
        BlockingQueue<Good> blockingQueue = new ArrayBlockingQueue<>(20);
        WzkProducer wzkProducer = new WzkProducer(blockingQueue);
        WzkConsumer wzkConsumer = new WzkConsumer(blockingQueue);
        new Thread(wzkProducer).start();
        Thread.sleep(10_000);
        new Thread(wzkConsumer).start();
    }
}

生产环境差距

需要重点考虑的问题:

  1. 消息持久化: 需要支持消息持久化存储,防止系统崩溃时消息丢失

  2. 消息可靠性保证:

    • 发送确认机制:实现ACK/NACK机制
    • 消费确认机制:消费者处理完成后需显式确认
  3. 高并发处理: 需要支持水平扩展的集群架构

  4. 系统可靠性:

    • 故障转移机制:主从切换、自动恢复
    • 多机房部署:异地多活容灾方案
    • 监控告警:完善的健康检查和监控体系
  5. 流量控制:

    • 限流机制:令牌桶或漏桶算法
    • 熔断降级:防止系统过载

错误速查

症状根因修复
生产者在队列满时抛异常或线程退出使用 blockingQueue.add() 队列满会抛异常改为 put()offer(timeout)
消费者 take() 阻塞看似”卡死”队列为空时 take() 必然阻塞如需可停机:用 poll(timeout)
无法优雅停机while(true) 无限循环while(!Thread.currentThread().isInterrupted())
数据可靠性为0内存队列无持久化、无副本、无重放需要 WAL 机制
重复消费/消息丢失不可控无ACK、无重试、无幂等、无死信引入 ACK + 重试 + DLQ 设计