TL;DR

  • 场景: 本地或 Docker 学习 RocketMQ Java API,覆盖 Producer/Consumer 四种写法
  • 结论: Producer 关注 namesrvAddr/编码/发送语义;Consumer 关注 group/offset/重试与线程生命周期
  • 产出: 可直接运行的同步/异步生产者 + Pull/Poll 消费者 + Push 并发监听示例

RocketMQ API 学习

WzkProducer (同步生产者)

package icu.wzk;

public class WzkProducer {
    public static void main(String[] args) throws Exception {
        // 1) Producer Group:同一类生产者的逻辑分组
        DefaultMQProducer producer = new DefaultMQProducer("wzk-icu");

        // 2) NameServer 地址
        producer.setNamesrvAddr("localhost:9876");

        // 3) 启动 producer
        producer.start();

        try {
            // 4) 构造消息
            String topic = "wzk-topic";
            String tag = "wzk-tag";
            String body = "Hello RocketMQ!";

            Message message = new Message(
                    topic,
                    tag,
                    body.getBytes(StandardCharsets.UTF_8)
            );

            // 5) 同步发送:阻塞等待 broker 返回结果
            SendResult sendResult = producer.send(message);

            // 6) 打印结果
            System.out.println(sendResult);

        } finally {
            // 7) 关闭 producer
            producer.shutdown();
        }
    }
}

WzkAsyncProducer (异步生产者)

package icu.wzk;

public class WzkAsyncProducer {
    public static void main(String[] args) throws Exception {
        // 1) Producer Group
        DefaultMQProducer producer = new DefaultMQProducer("wzk-icu");

        // 2) NameServer
        producer.setNamesrvAddr("127.0.0.1:9876");

        // 3) 启动 producer
        producer.start();

        final int total = 100;
        CountDownLatch latch = new CountDownLatch(total);

        try {
            for (int i = 0; i < total; i++) {
                String topic = "wzk-topic";
                String tag = "wzk-tag";
                String body = "Hello RocketMQ " + i;

                Message msg = new Message(
                        topic,
                        tag,
                        body.getBytes(StandardCharsets.UTF_8)
                );

                // 4) 异步发送:立即返回;结果通过回调线程通知
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.println("发送成功 " + sendResult);
                        latch.countDown();
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        System.out.println("发送失败 " + throwable.getMessage());
                        throwable.printStackTrace();
                        latch.countDown();
                    }
                });
            }

            // 5) 等待所有回调结束
            boolean finished = latch.await(10, TimeUnit.SECONDS);
            if (!finished) {
                System.out.println("等待回调超时:仍有部分消息未返回 sendResult");
            }
        } finally {
            // 6) 关闭 producer
            producer.shutdown();
        }
    }
}

WzkPullConsumer (拉取消费者)

package icu.wzk;

public class WzkPullConsumer {
    public static void main(String[] args) throws Exception {
        // 1) Consumer Group
        DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("wzk-icu");

        // 2) NameServer
        consumer.setNamesrvAddr("localhost:9876");

        // 3) 订阅:topic + tag expression
        consumer.subscribe("wzk-topic", "*");

        // 4) 自动提交 offset
        consumer.setAutoCommit(true);

        // 5) 启动 consumer
        consumer.start();

        try {
            while (true) {
                // 6) poll 拉取
                List<MessageExt> msgs = consumer.poll(3000);

                // 7) 业务处理
                for (MessageExt msg : msgs) {
                    String body = new String(msg.getBody(), StandardCharsets.UTF_8);
                    System.out.printf(
                            "msgId=%s, queueId=%d, reconsumeTimes=%d, body=%s%n",
                            msg.getMsgId(),
                            msg.getQueueId(),
                            msg.getReconsumeTimes(),
                            body
                    );
                }
            }
        } finally {
            // 8) 关闭 consumer
            consumer.shutdown();
        }
    }
}

WzkPushConsumer (推送消费者)

package icu.wzk;

public class WzkPushConsumer {
    public static void main(String[] args) throws Exception {
        // 1) Consumer Group
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wzk-icu");

        // 2) NameServer 地址
        consumer.setNamesrvAddr("localhost:9876");

        // 3) 订阅 Topic + Tag 过滤
        consumer.subscribe("wzk-topic", "*");

        // 4) 注册并发消费监听器
        consumer.setMessageListener((MessageListenerConcurrently) (msgs, context) -> {
            MessageQueue mq = context.getMessageQueue();
            System.out.println("MQ = " + mq);

            for (MessageExt msg : msgs) {
                try {
                    String body = new String(msg.getBody(), StandardCharsets.UTF_8);

                    System.out.printf(
                            "msgId=%s topic=%s tags=%s keys=%s reconsumeTimes=%d body=%s%n",
                            msg.getMsgId(),
                            msg.getTopic(),
                            msg.getTags(),
                            msg.getKeys(),
                            msg.getReconsumeTimes(),
                            body
                    );
                } catch (Exception e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }

            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        });

        // 5) 启动 consumer
        consumer.start();

        System.out.println("WzkPushConsumer started.");
        Thread.currentThread().join();
    }
}

错误速查

症状根因定位修复
Producer/Consumer 连不上 NameServernamesrvAddr 写成 0.0.0.0 或端口不通本机用 localhost/127.0.0.1:9876
发送偶发失败/超时路由未就绪、broker 不可达、网络抖动启动后先等待路由稳定
消费端”看起来跑着但没消息”订阅的 topic/tag 不匹配检查 subscribe 表达式
PushConsumer 业务异常但消息不再重试捕获异常后仍返回 CONSUME_SUCCESS异常时返回 RECONSUME_LATER
PullConsumer 重启后重复或丢处理AutoCommit=true,业务失败无法回滚需要可控语义时 setAutoCommit(false)
程序一闪而过/Consumer 很快退出main 线程结束导致 JVM 退出用 join/阻塞确保进程存活
中文/特殊字符乱码body 编码未显式指定统一 StandardCharsets.UTF_8