TL;DR
- Scenario: RabbitMQ fanout broadcast publish-subscribe, complete Java producer/consumer run-through, explaining push/pull consumption differences
- Conclusion: Default push mode (basicConsume) is better for conventional real-time consumption; pull mode (basicGet) for conditional/batch/rate-limiting scenarios
- Output: Directly reusable EmitLog/ReceiveLogs code + rabbitmqctl binding verification method + common fault quick reference
Message Push/Pull Mode Details
Push Mode (Push) Implementation
1. Inherit DefaultConsumer Base Class
This is the standard implementation provided by RabbitMQ Java client, need to override handleDelivery method to process received messages.
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) {
String message = new String(body, "UTF-8");
// Message processing logic
}
};
channel.basicConsume(queueName, true, consumer);
2. Using Spring AMQP’s SimpleMessageListenerContainer
This is a higher-level封装 provided by Spring framework, supports auto-declaration of queues, exchanges and bindings, message conversion, error handling and other advanced features.
@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueNames("queueName");
container.setMessageListener(messageListenerAdapter);
return container;
}
Push Mode Applicable Scenarios
Push mode is the most commonly used consumption mode, especially suitable for:
- Scenarios with high real-time requirements
- Stable consumer processing capacity
- Scenarios requiring continuous message processing
Pull Mode (Pull) Necessity
In the following scenarios, push mode may not be suitable, pull mode is needed:
- Conditional Consumption: Consumer can only process messages when specific conditions are met
- Batch Processing Requirements: Need to pull multiple messages at once for batch processing
GetResponse response = channel.basicGet(queueName, false);
if (response != null) {
// Process message
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
}
- Resource Constrained Scenarios: Consumer processing capacity is limited, need to control message pull rate
- Special Business Requirements: Need precise control over message pull timing
Push/Pull Mode Selection Suggestions
- Default to Push Mode: Simple implementation, better performance, suitable for most conventional scenarios
- Consider Pull Mode When: Need precise control over message consumption timing, batch processing messages, consumer resources constrained, implementing special business logic
- Hybrid Mode: Use push mode most of the time, switch to pull mode under specific conditions
EmitLog Code
package icu.wzk.demo;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = (args.length < 1)
? "info: Hello World!"
: String.join(" ", args);
channel.basicPublish(
EXCHANGE_NAME,
"",
null,
message.getBytes(StandardCharsets.UTF_8)
);
System.out.println(" [x] Sent '" + message + "'");
}
}
}
ReceiveLogs Code
package icu.wzk.demo;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
boolean autoAck = true;
channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {});
}
}
Startup Test
View binding relationships:
rabbitmqctl list_bindings
Formatted output:
rabbitmqctl list_bindings --formatter pretty_table
Error Quick Reference
| Symptom | Root Cause | Fix |
|---|---|---|
| Consumer never receives messages | Exchange name inconsistency | Compare producer/consumer EXCHANGE_NAME; rabbitmqctl list_exchanges |
| PRECONDITION_FAILED on send | Exchange already exists but type different | Keep exchangeDeclare parameters consistent; delete old exchange and recreate |
| Can receive after consumer starts, all gone after RabbitMQ restart | Using temporary queue and non-durable | Production change to declare named durable queue |
| Producer shows Sent, but queue has no messages | No queue bound to fanout exchange | Start consumer first to complete queueBind |
| Consumer occasionally loses messages | autoAck=true, callback not complete before process crash | Change to manual ack: success basicAck, fail basicNack/requeue |
| Memory/backlog明显 after consumption slows | Push mode no rate limiting, prefetch not set | Set basicQos(prefetch), control concurrency and consumption rate |
| basicGet frequently returns null | Queue empty or taken by other consumers | Ensure queue exclusive or design scheduling strategy |