TL;DR

  • Scenario: RabbitMQ fanout broadcast publish-subscribe, complete Java producer/consumer run-through, explaining push/pull consumption differences
  • Conclusion: Default push mode (basicConsume) is better for conventional real-time consumption; pull mode (basicGet) for conditional/batch/rate-limiting scenarios
  • Output: Directly reusable EmitLog/ReceiveLogs code + rabbitmqctl binding verification method + common fault quick reference

Message Push/Pull Mode Details

Push Mode (Push) Implementation

1. Inherit DefaultConsumer Base Class

This is the standard implementation provided by RabbitMQ Java client, need to override handleDelivery method to process received messages.

Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope,
                             AMQP.BasicProperties properties, byte[] body) {
        String message = new String(body, "UTF-8");
        // Message processing logic
    }
};
channel.basicConsume(queueName, true, consumer);

2. Using Spring AMQP’s SimpleMessageListenerContainer

This is a higher-level封装 provided by Spring framework, supports auto-declaration of queues, exchanges and bindings, message conversion, error handling and other advanced features.

@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory());
    container.setQueueNames("queueName");
    container.setMessageListener(messageListenerAdapter);
    return container;
}

Push Mode Applicable Scenarios

Push mode is the most commonly used consumption mode, especially suitable for:

  • Scenarios with high real-time requirements
  • Stable consumer processing capacity
  • Scenarios requiring continuous message processing

Pull Mode (Pull) Necessity

In the following scenarios, push mode may not be suitable, pull mode is needed:

  1. Conditional Consumption: Consumer can only process messages when specific conditions are met
  2. Batch Processing Requirements: Need to pull multiple messages at once for batch processing
GetResponse response = channel.basicGet(queueName, false);
if (response != null) {
    // Process message
    channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
}
  1. Resource Constrained Scenarios: Consumer processing capacity is limited, need to control message pull rate
  2. Special Business Requirements: Need precise control over message pull timing

Push/Pull Mode Selection Suggestions

  1. Default to Push Mode: Simple implementation, better performance, suitable for most conventional scenarios
  2. Consider Pull Mode When: Need precise control over message consumption timing, batch processing messages, consumer resources constrained, implementing special business logic
  3. Hybrid Mode: Use push mode most of the time, switch to pull mode under specific conditions

EmitLog Code

package icu.wzk.demo;

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

            String message = (args.length < 1)
                    ? "info: Hello World!"
                    : String.join(" ", args);

            channel.basicPublish(
                    EXCHANGE_NAME,
                    "",
                    null,
                    message.getBytes(StandardCharsets.UTF_8)
            );

            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

ReceiveLogs Code

package icu.wzk.demo;

public class ReceiveLogs {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        String queueName = channel.queueDeclare().getQueue();

        channel.queueBind(queueName, EXCHANGE_NAME, "");

        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println(" [x] Received '" + message + "'");
        };

        boolean autoAck = true;
        channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {});
    }
}

Startup Test

View binding relationships:

rabbitmqctl list_bindings

Formatted output:

rabbitmqctl list_bindings --formatter pretty_table

Error Quick Reference

SymptomRoot CauseFix
Consumer never receives messagesExchange name inconsistencyCompare producer/consumer EXCHANGE_NAME; rabbitmqctl list_exchanges
PRECONDITION_FAILED on sendExchange already exists but type differentKeep exchangeDeclare parameters consistent; delete old exchange and recreate
Can receive after consumer starts, all gone after RabbitMQ restartUsing temporary queue and non-durableProduction change to declare named durable queue
Producer shows Sent, but queue has no messagesNo queue bound to fanout exchangeStart consumer first to complete queueBind
Consumer occasionally loses messagesautoAck=true, callback not complete before process crashChange to manual ack: success basicAck, fail basicNack/requeue
Memory/backlog明显 after consumption slowsPush mode no rate limiting, prefetch not setSet basicQos(prefetch), control concurrency and consumption rate
basicGet frequently returns nullQueue empty or taken by other consumersEnsure queue exclusive or design scheduling strategy