TL;DR
- Scenario: Implement a minimal runnable message model with queue buffering and async decoupling using Java concurrency
- Conclusion: BlockingQueue can explain MQ core interactions, but lacks persistence, ACK, retry, cluster, observability
- Output: A runnable Producer/Consumer Demo + enterprise gap list
Version Matrix
| Project | Status | Description |
|---|---|---|
| JDK Version | Unverified | BlockingQueue/ArrayBlockingQueue logic consistent across JDK 8/11/17 |
| Queue Implementation | Verified | ArrayBlockingQueue<>(20) clearly defined |
Message Middleware - Basic Concepts
Message-Oriented Middleware (MOM) is a core component for implementing asynchronous communication in distributed system architecture.
Typical Features:
- Async Communication Mechanism: Uses “fire-and-forget” pattern
- Message Persistence: Ensures messages not lost through disk storage or replication mechanism
- Protocol Support: Usually supports AMQP, MQTT, STOMP and other standard protocols
- Message Routing: Provides flexible routing strategies
Mainstream Implementation Comparison:
| System | Throughput | Latency | Persistence |
|---|---|---|---|
| RabbitMQ | Medium | Low | Supported |
| Kafka | High | Medium | Strong support |
| RocketMQ | High | Low | Supported |
Custom Message Middleware
Producer
public class WzkProducer implements Runnable {
private final BlockingQueue<Good> blockingQueue;
@Override
public void run() {
try {
while (true) {
Thread.sleep(2000);
if (blockingQueue.remainingCapacity() > 0) {
Good good = Good.builder()
.id(UUID.randomUUID().toString())
.type("food")
.build();
blockingQueue.add(good);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
Consumer
public class WzkConsumer implements Runnable {
private final BlockingQueue<Good> blockingQueue;
@Override
public void run() {
try {
while (true) {
Thread.sleep(1000);
Good good = blockingQueue.take();
System.out.println("Consumed food: " + good);
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
Main
public class WzkMain {
public static void main(String[] args) throws Exception {
BlockingQueue<Good> blockingQueue = new ArrayBlockingQueue<>(20);
WzkProducer wzkProducer = new WzkProducer(blockingQueue);
WzkConsumer wzkConsumer = new WzkConsumer(blockingQueue);
new Thread(wzkProducer).start();
Thread.sleep(10_000);
new Thread(wzkConsumer).start();
}
}
Production Environment Gaps
Key Issues to Consider:
-
Message Persistence: Need to support persistent message storage to prevent message loss when system crashes
-
Message Reliability Guarantee:
- Send acknowledgment mechanism: Implement ACK/NACK mechanism
- Consume acknowledgment mechanism: Consumer needs explicit acknowledgment after processing
-
High Concurrency Processing: Need to support horizontally scalable cluster architecture
-
System Reliability:
- Failover mechanism: Primary-secondary switch, automatic recovery
- Multi-datacenter deployment: Multi-active disaster recovery solution
- Monitoring alerts: Complete health checks and monitoring system
-
Flow Control:
- Rate limiting mechanism: Token bucket or leaky bucket algorithm
- Circuit breaker degradation: Prevent system overload
Error Quick Reference
| Symptom | Root Cause | Fix |
|---|---|---|
| Producer throws exception or thread exits when queue is full | Using blockingQueue.add() throws exception when queue full | Change to put() or offer(timeout) |
Consumer take() blocks appearing “stuck” | take() blocks when queue empty | For graceful shutdown: use poll(timeout) |
| Cannot shutdown gracefully | while(true) infinite loop | Use while(!Thread.currentThread().isInterrupted()) |
| Data reliability is 0 | In-memory queue, no persistence, no replica, no replay | Need WAL mechanism |
| Duplicate consumption/message loss uncontrollable | No ACK, no retry, no idempotency, no dead letter | Introduce ACK + retry + DLQ design |