TL;DR
- 场景: 本地或 Docker 学习 RocketMQ Java API,覆盖 Producer/Consumer 四种写法
- 结论: Producer 关注 namesrvAddr/编码/发送语义;Consumer 关注 group/offset/重试与线程生命周期
- 产出: 可直接运行的同步/异步生产者 + Pull/Poll 消费者 + Push 并发监听示例
RocketMQ API 学习
WzkProducer (同步生产者)
package icu.wzk;
public class WzkProducer {
public static void main(String[] args) throws Exception {
// 1) Producer Group:同一类生产者的逻辑分组
DefaultMQProducer producer = new DefaultMQProducer("wzk-icu");
// 2) NameServer 地址
producer.setNamesrvAddr("localhost:9876");
// 3) 启动 producer
producer.start();
try {
// 4) 构造消息
String topic = "wzk-topic";
String tag = "wzk-tag";
String body = "Hello RocketMQ!";
Message message = new Message(
topic,
tag,
body.getBytes(StandardCharsets.UTF_8)
);
// 5) 同步发送:阻塞等待 broker 返回结果
SendResult sendResult = producer.send(message);
// 6) 打印结果
System.out.println(sendResult);
} finally {
// 7) 关闭 producer
producer.shutdown();
}
}
}
WzkAsyncProducer (异步生产者)
package icu.wzk;
public class WzkAsyncProducer {
public static void main(String[] args) throws Exception {
// 1) Producer Group
DefaultMQProducer producer = new DefaultMQProducer("wzk-icu");
// 2) NameServer
producer.setNamesrvAddr("127.0.0.1:9876");
// 3) 启动 producer
producer.start();
final int total = 100;
CountDownLatch latch = new CountDownLatch(total);
try {
for (int i = 0; i < total; i++) {
String topic = "wzk-topic";
String tag = "wzk-tag";
String body = "Hello RocketMQ " + i;
Message msg = new Message(
topic,
tag,
body.getBytes(StandardCharsets.UTF_8)
);
// 4) 异步发送:立即返回;结果通过回调线程通知
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("发送成功 " + sendResult);
latch.countDown();
}
@Override
public void onException(Throwable throwable) {
System.out.println("发送失败 " + throwable.getMessage());
throwable.printStackTrace();
latch.countDown();
}
});
}
// 5) 等待所有回调结束
boolean finished = latch.await(10, TimeUnit.SECONDS);
if (!finished) {
System.out.println("等待回调超时:仍有部分消息未返回 sendResult");
}
} finally {
// 6) 关闭 producer
producer.shutdown();
}
}
}
WzkPullConsumer (拉取消费者)
package icu.wzk;
public class WzkPullConsumer {
public static void main(String[] args) throws Exception {
// 1) Consumer Group
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("wzk-icu");
// 2) NameServer
consumer.setNamesrvAddr("localhost:9876");
// 3) 订阅:topic + tag expression
consumer.subscribe("wzk-topic", "*");
// 4) 自动提交 offset
consumer.setAutoCommit(true);
// 5) 启动 consumer
consumer.start();
try {
while (true) {
// 6) poll 拉取
List<MessageExt> msgs = consumer.poll(3000);
// 7) 业务处理
for (MessageExt msg : msgs) {
String body = new String(msg.getBody(), StandardCharsets.UTF_8);
System.out.printf(
"msgId=%s, queueId=%d, reconsumeTimes=%d, body=%s%n",
msg.getMsgId(),
msg.getQueueId(),
msg.getReconsumeTimes(),
body
);
}
}
} finally {
// 8) 关闭 consumer
consumer.shutdown();
}
}
}
WzkPushConsumer (推送消费者)
package icu.wzk;
public class WzkPushConsumer {
public static void main(String[] args) throws Exception {
// 1) Consumer Group
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wzk-icu");
// 2) NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
// 3) 订阅 Topic + Tag 过滤
consumer.subscribe("wzk-topic", "*");
// 4) 注册并发消费监听器
consumer.setMessageListener((MessageListenerConcurrently) (msgs, context) -> {
MessageQueue mq = context.getMessageQueue();
System.out.println("MQ = " + mq);
for (MessageExt msg : msgs) {
try {
String body = new String(msg.getBody(), StandardCharsets.UTF_8);
System.out.printf(
"msgId=%s topic=%s tags=%s keys=%s reconsumeTimes=%d body=%s%n",
msg.getMsgId(),
msg.getTopic(),
msg.getTags(),
msg.getKeys(),
msg.getReconsumeTimes(),
body
);
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 5) 启动 consumer
consumer.start();
System.out.println("WzkPushConsumer started.");
Thread.currentThread().join();
}
}
错误速查
| 症状 | 根因定位 | 修复 |
|---|
| Producer/Consumer 连不上 NameServer | namesrvAddr 写成 0.0.0.0 或端口不通 | 本机用 localhost/127.0.0.1:9876 |
| 发送偶发失败/超时 | 路由未就绪、broker 不可达、网络抖动 | 启动后先等待路由稳定 |
| 消费端”看起来跑着但没消息” | 订阅的 topic/tag 不匹配 | 检查 subscribe 表达式 |
| PushConsumer 业务异常但消息不再重试 | 捕获异常后仍返回 CONSUME_SUCCESS | 异常时返回 RECONSUME_LATER |
| PullConsumer 重启后重复或丢处理 | AutoCommit=true,业务失败无法回滚 | 需要可控语义时 setAutoCommit(false) |
| 程序一闪而过/Consumer 很快退出 | main 线程结束导致 JVM 退出 | 用 join/阻塞确保进程存活 |
| 中文/特殊字符乱码 | body 编码未显式指定 | 统一 StandardCharsets.UTF_8 |