TL;DR
- Scenario: Learning RocketMQ Java API locally or in Docker, covering Producer/Consumer four writing methods
- Conclusion: Producer focuses on namesrvAddr/encoding/send semantics; Consumer focuses on group/offset/retry and thread lifecycle
- Output: Directly runnable sync/async producer + Pull/Poll consumer + Push concurrent listener examples
RocketMQ API Learning
WzkProducer (Sync Producer)
package icu.wzk;
public class WzkProducer {
public static void main(String[] args) throws Exception {
// 1) Producer Group: logical grouping of same type producers
DefaultMQProducer producer = new DefaultMQProducer("wzk-icu");
// 2) NameServer address
producer.setNamesrvAddr("localhost:9876");
// 3) Start producer
producer.start();
try {
// 4) Construct message
String topic = "wzk-topic";
String tag = "wzk-tag";
String body = "Hello RocketMQ!";
Message message = new Message(
topic,
tag,
body.getBytes(StandardCharsets.UTF_8)
);
// 5) Sync send: block and wait for broker to return result
SendResult sendResult = producer.send(message);
// 6) Print result
System.out.println(sendResult);
} finally {
// 7) Shutdown producer
producer.shutdown();
}
}
}
WzkAsyncProducer (Async Producer)
package icu.wzk;
public class WzkAsyncProducer {
public static void main(String[] args) throws Exception {
// 1) Producer Group
DefaultMQProducer producer = new DefaultMQProducer("wzk-icu");
// 2) NameServer
producer.setNamesrvAddr("127.0.0.1:9876");
// 3) Start producer
producer.start();
final int total = 100;
CountDownLatch latch = new CountDownLatch(total);
try {
for (int i = 0; i < total; i++) {
String topic = "wzk-topic";
String tag = "wzk-tag";
String body = "Hello RocketMQ " + i;
Message msg = new Message(
topic,
tag,
body.getBytes(StandardCharsets.UTF_8)
);
// 4) Async send: return immediately; result notified via callback thread
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("Send success " + sendResult);
latch.countDown();
}
@Override
public void onException(Throwable throwable) {
System.out.println("Send failed " + throwable.getMessage());
throwable.printStackTrace();
latch.countDown();
}
});
}
// 5) Wait for all callbacks to finish
boolean finished = latch.await(10, TimeUnit.SECONDS);
if (!finished) {
System.out.println("Wait for callback timeout: some messages still haven't returned sendResult");
}
} finally {
// 6) Shutdown producer
producer.shutdown();
}
}
}
WzkPullConsumer (Pull Consumer)
package icu.wzk;
public class WzkPullConsumer {
public static void main(String[] args) throws Exception {
// 1) Consumer Group
DefaultLitePullConsumer consumer = new DefaultLitePullConsumer("wzk-icu");
// 2) NameServer
consumer.setNamesrvAddr("localhost:9876");
// 3) Subscribe: topic + tag expression
consumer.subscribe("wzk-topic", "*");
// 4) Auto commit offset
consumer.setAutoCommit(true);
// 5) Start consumer
consumer.start();
try {
while (true) {
// 6) poll pull
List<MessageExt> msgs = consumer.poll(3000);
// 7) Business processing
for (MessageExt msg : msgs) {
String body = new String(msg.getBody(), StandardCharsets.UTF_8);
System.out.printf(
"msgId=%s, queueId=%d, reconsumeTimes=%d, body=%s%n",
msg.getMsgId(),
msg.getQueueId(),
msg.getReconsumeTimes(),
body
);
}
}
} finally {
// 8) Shutdown consumer
consumer.shutdown();
}
}
}
WzkPushConsumer (Push Consumer)
package icu.wzk;
public class WzkPushConsumer {
public static void main(String[] args) throws Exception {
// 1) Consumer Group
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("wzk-icu");
// 2) NameServer address
consumer.setNamesrvAddr("localhost:9876");
// 3) Subscribe Topic + Tag filter
consumer.subscribe("wzk-topic", "*");
// 4) Register concurrent consumption listener
consumer.setMessageListener((MessageListenerConcurrently) (msgs, context) -> {
MessageQueue mq = context.getMessageQueue();
System.out.println("MQ = " + mq);
for (MessageExt msg : msgs) {
try {
String body = new String(msg.getBody(), StandardCharsets.UTF_8);
System.out.printf(
"msgId=%s topic=%s tags=%s keys=%s reconsumeTimes=%d body=%s%n",
msg.getMsgId(),
msg.getTopic(),
msg.getTags(),
msg.getKeys(),
msg.getReconsumeTimes(),
body
);
} catch (Exception e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 5) Start consumer
consumer.start();
System.out.println("WzkPushConsumer started.");
Thread.currentThread().join();
}
}
Error Quick Reference
| Symptom | Root Cause | Fix |
|---|
| Producer/Consumer can’t connect to NameServer | namesrvAddr written as 0.0.0.0 or port unreachable | Use localhost/127.0.0.1:9876 on local |
| Occasional send failure/timeout | Routing not ready, broker unreachable, network jitter | Wait for routing to stabilize after startup |
| Consumer “appears to run but no messages” | Subscribed topic/tag doesn’t match | Check subscribe expression |
| PushConsumer business exception but message not retrying | Caught exception but still returned CONSUME_SUCCESS | Return RECONSUME_LATER on exception |
| PullConsumer restarts causing duplicate or lost processing | AutoCommit=true, business failure can’t rollback | When controllable semantics needed, setAutoCommit(false) |
| Program flashes and exits/Consumer exits quickly | main thread ends causing JVM exit | Use join/block to ensure process stays alive |
| Chinese/special character garbled | body encoding not explicitly specified | Unify StandardCharsets.UTF_8 |