TL;DR

  • Scenario: Learning RocketMQ Java API locally or in Docker, covering Producer/Consumer four writing methods
  • Conclusion: Producer focuses on namesrvAddr/encoding/send semantics; Consumer focuses on group/offset/retry and thread lifecycle
  • Output: Directly runnable sync/async producer + Pull/Poll consumer + Push concurrent listener examples

RocketMQ API Learning

WzkProducer (Sync Producer)

package icu.wzk;

public class WzkProducer {
    public static void main(String[] args) throws Exception {
        // 1) Producer Group: logical grouping of same type producers
        DefaultMQProducer producer = new DefaultMQProducer("wzk-icu");

        // 2) NameServer address
        producer.setNamesrvAddr("localhost:9876");

        // 3) Start producer
        producer.start();

        try {
            // 4) Construct message
            String topic = "wzk-topic";
            String tag = "wzk-tag";
            String body = "Hello RocketMQ!";

            Message message = new Message(
                    topic,
                    tag,
                    body.getBytes(StandardCharsets.UTF_8)
            );

            // 5) Sync send: block and wait for broker to return result
            SendResult sendResult = producer.send(message);

            // 6) Print result
            System.out.println(sendResult);

        } finally {
            // 7) Shutdown producer
            producer.shutdown();
        }
    }
}

WzkAsyncProducer (Async Producer)

package icu.wzk;

public class WzkAsyncProducer {
    public static void main(String[] args) throws Exception {
        // 1) Producer Group
        DefaultMQProducer producer = new DefaultMQProducer("wzk-icu");

        // 2) NameServer
        producer.setNamesrvAddr("127.0.0.1:9876");

        // 3) Start producer
        producer.start();

        final int total = 100;
        CountDownLatch latch = new CountDownLatch(total);

        try {
            for (int i = 0; i < total; i++) {
                String topic = "wzk-topic";
                String tag = "wzk-tag";
                String body = "Hello RocketMQ " + i;

                Message msg = new Message(
                        topic,
                        tag,
                        body.getBytes(StandardCharsets.UTF_8)
                );

                // 4) Async send: return immediately; result notified via callback thread
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println("Send success " + sendResult);
                        latch.countDown();
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        System.out.println("Send failed " + throwable.getMessage());
                        throwable.printStackTrace();
                        latch.countDown();
                    }
                });
            }

            // 5) Wait for all callbacks to finish
            boolean finished = latch.await(10, TimeUnit.SECONDS);
            if (!finished) {
                System.out.println("Wait for callback timeout: some messages still haven't returned sendResult");
            }
        } finally {
            // 6) Shutdown producer
            producer.shutdown();
        }
    }
}

WzkPullConsumer (Pull Consumer)

package icu.wzk;

public class WzkPullConsumer {
    public static void main(String[] args) throws Exception {
        // 1) Consumer Group
        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("wzk-icu");

        // 2) NameServer
        consumer.setNamesrvAddr("localhost:9876");

        // 3) Subscribe: topic + tag expression
        consumer.subscribe("wzk-topic", "*");

        // 4) Auto commit offset
        consumer.setAutoCommit(true);

        // 5) Start consumer
        consumer.start();

        try {
            while (true) {
                // 6) poll pull
                List<MessageExt> msgs = consumer.poll(3000);

                // 7) Business processing
                for (MessageExt msg : msgs) {
                    String body = new String(msg.getBody(), StandardCharsets.UTF_8);
                    System.out.printf(
                            "msgId=%s, queueId=%d, reconsumeTimes=%d, body=%s%n",
                            msg.getMsgId(),
                            msg.getQueueId(),
                            msg.getReconsumeTimes(),
                            body
                    );
                }
            }
        } finally {
            // 8) Shutdown consumer
            consumer.shutdown();
        }
    }
}

WzkPushConsumer (Push Consumer)

package icu.wzk;

public class WzkPushConsumer {
    public static void main(String[] args) throws Exception {
        // 1) Consumer Group
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wzk-icu");

        // 2) NameServer address
        consumer.setNamesrvAddr("localhost:9876");

        // 3) Subscribe Topic + Tag filter
        consumer.subscribe("wzk-topic", "*");

        // 4) Register concurrent consumption listener
        consumer.setMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            MessageQueue mq = context.getMessageQueue();
            System.out.println("MQ = " + mq);

            for (MessageExt msg : msgs) {
                try {
                    String body = new String(msg.getBody(), StandardCharsets.UTF_8);

                    System.out.printf(
                            "msgId=%s topic=%s tags=%s keys=%s reconsumeTimes=%d body=%s%n",
                            msg.getMsgId(),
                            msg.getTopic(),
                            msg.getTags(),
                            msg.getKeys(),
                            msg.getReconsumeTimes(),
                            body
                    );
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        // 5) Start consumer
        consumer.start();

        System.out.println("WzkPushConsumer started.");
        Thread.currentThread().join();
    }
}

Error Quick Reference

SymptomRoot CauseFix
Producer/Consumer can’t connect to NameServernamesrvAddr written as 0.0.0.0 or port unreachableUse localhost/127.0.0.1:9876 on local
Occasional send failure/timeoutRouting not ready, broker unreachable, network jitterWait for routing to stabilize after startup
Consumer “appears to run but no messages”Subscribed topic/tag doesn’t matchCheck subscribe expression
PushConsumer business exception but message not retryingCaught exception but still returned CONSUME_SUCCESSReturn RECONSUME_LATER on exception
PullConsumer restarts causing duplicate or lost processingAutoCommit=true, business failure can’t rollbackWhen controllable semantics needed, setAutoCommit(false)
Program flashes and exits/Consumer exits quicklymain thread ends causing JVM exitUse join/block to ensure process stays alive
Chinese/special character garbledbody encoding not explicitly specifiedUnify StandardCharsets.UTF_8