TL;DR
- Scenario: Route messages by log level/business type, avoid fanout broadcast causing invalid consumption
- Conclusion: direct only does “exact matching”, routing depends on routingKey==bindingKey; one queue can bind multiple keys
- Output: direct_logs example (Producer/Consumer), multi-binding distribution model, common pitfalls quick reference
RabbitMQ Routing Mode
Using direct type Exchange to implement message selective consumption:
1. Exchange Declaration and Configuration
- Declare a direct type Exchange
- This Exchange strictly routes messages based on RoutingKey
- Only queues with exactly matching RoutingKey can receive messages
2. Producer Sends Messages
- When producer sends N messages
- Each message needs to specify different RoutingKey
- Example:
- “error.log” for error logs
- “info.log” for info logs
- “warning.log” for warning logs
3. Consumer Binding Configuration
- Each consumer needs to:
- Declare their own queue
- Bind queue to Exchange
- Specify specific RoutingKey
- Example bindings:
- QueueA binds RoutingKey=“error.log” (only receives error logs)
- QueueB binds RoutingKey=“*.log” (receives all logs)
4. Practical Application Scenarios
- Log System Implementation:
- Error log consumer: binds “error.log”, only receives error messages to write to file
- Console consumer: binds “#.log”, receives all level logs printed to console
- Order System:
- Payment queue: binds “order.payment”
- Shipping queue: binds “order.shipment”
- Each queue only processes specific business messages
5. Difference from Fanout Mode
- fanout: Broadcast mode, ignores RoutingKey
- direct: Exact matching RoutingKey routing mode
- Selection basis:
- Use fanout when broadcasting needed
- Use direct when selective consumption needed
6. Advanced Usage
- Multi-binding: One queue can bind multiple RoutingKeys
- Combined usage: Can use multiple Exchanges simultaneously for complex routing
- Priority: Can set priority for messages with different RoutingKeys
This pattern is especially suitable for scenarios requiring differentiated processing based on message types, effectively achieving message classification and precise delivery.
Binding Queues
In the previous mode, exchange usage:
channel.queueBind(queueName, EXCHANGE_NAME, "");
Binding statement has third parameter: routingKey
channel.queueBind(queueName, EXCHANGE_NAME, "black");
bindingKey’s effect depends on the exchange type used. For fanout type exchange, this parameter setting is invalid and system ignores it.
Direct Exchange
There are many applications in distributed systems, these applications need operation platform monitoring, one important information is server log records. We need to deliver different log level records to different applications for processing. In this case, we can use direct exchange.
If we need different processing for different messages, we cannot use fanout type exchange because it only blindly broadcasts messages. We need to use direct type exchange. The routing algorithm of direct exchange is simple: as long as message’s RoutingKey matches queue’s BindingKey, message can be pushed to that queue.
Direct Exchange Working Principle
In the diagram, exchange X is direct type, two bound queues, one queue’s bindingKey is orange, other queue’s bindingKey is black and green.
- routingKey is orange message sent to queue Q1
- routingKey is black and green message sent to Q2 queue
- Other messages discarded
Multiple Bindings
Can also have multiple bindings: two queues simultaneously bind the same bindingKey. In this case behavior is similar to fanout, also broadcast.
In the case, we use log level as: routingKey.
EmitLogsDirect (Producer Code)
package icu.wzk.demo;
/**
* EmitLogsDirect: Publisher (Producer)
*
* Goal:
* - Declare direct type exchange direct_logs
* - Use routingKey=severity (info/warn/error) to publish logs
*
* direct semantics:
* - routingKey participates in routing matching
* - Queue needs to bind to exchange using bindingKey
* - When message's routingKey exactly matches bindingKey, that queue receives it
*
* Example effect:
* - routingKey="error" message, only enters queue bound with bindingKey="error"
* - A queue can also bind multiple keys simultaneously (e.g., bind warn and error)
*/
public class EmitLogsDirect {
// Exchange name: must be consistent between producer and consumer
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
// 1) Configure connection factory
ConnectionFactory factory = new ConnectionFactory();
// RabbitMQ connection parameters
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("secret");
factory.setPort(5672);
// 2) Establish connection and Channel
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
/**
* 3) Declare direct exchange
* - Create if doesn't exist
* - If exists, verify type/parameter consistency
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
/**
* 4) Loop to send 100 log messages, severity cycles between info/warn/error
* routingKey=severity, key to direct routing
*/
for (int i = 0; i < 100; i++) {
String severity = pickSeverity(i);
// Message body: business content
String logStr = "This is message for 【" + severity + "】";
/**
* 5) Publish message
* basicPublish(exchange, routingKey, props, body)
* - exchange: direct_logs
* - routingKey: severity (info/warn/error)
* - props: null (no additional message properties)
* - body: UTF-8 encoded byte array
*/
channel.basicPublish(
EXCHANGE_NAME,
severity,
null,
logStr.getBytes(StandardCharsets.UTF_8)
);
System.out.println(logStr);
}
}
}
/**
* Select severity based on i's value.
* i%3 result can only be 0/1/2.
*/
private static String pickSeverity(int i) {
int mod = i % 3;
if (mod == 0) return "info";
if (mod == 1) return "warn";
return "error";
}
}
ReceiveErrorLogsDirect (Consumer - Only Receive Error)
package icu.wzk.demo;
public class ReceiveErrorLogsDirect {
// Consistent configuration with producer
private static final String HOST = "localhost";
private static final int PORT = 5672;
private static final String VHOST = "/";
private static final String USERNAME = "admin";
private static final String PASSWORD = "secret";
private static final String EXCHANGE_NAME = "direct_logs";
private static final String KEY_ERROR = "error";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setVirtualHost(VHOST);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// Declare direct exchange
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// Declare temporary queue
String queueName = channel.queueDeclare().getQueue();
// Bind queue to exchange (only receive error)
channel.queueBind(queueName, EXCHANGE_NAME, KEY_ERROR);
// Message callback
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String routingKey = delivery.getEnvelope().getRoutingKey();
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
};
// Start consuming (autoAck=true)
boolean autoAck = true;
channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {});
}
}
ReceiveWarnInfoLogsDirect (Consumer - Receive Warn + Info)
/**
* ReceiveWarnInfoLogsDirect: Consumer side (Consumer)
*
* Goal:
* - Subscribe to direct type exchange direct_logs
* - Only receive messages with routingKey = "warn" or "info"
*
* direct semantics (key):
* - Producer publishes message with routingKey (severity: info/warn/error)
* - Consumer specifies bindingKey via queueBind(...)
* - When routingKey exactly matches bindingKey, message routes to that queue
*
* This class binds two keys:
* - queueBind(..., "warn")
* - queueBind(..., "info")
* Therefore same queue receives messages from both routingKey types.
*/
public class ReceiveWarnInfoLogsDirect {
private static final String HOST = "node1";
private static final int PORT = 5672;
private static final String VHOST = "/";
private static final String USERNAME = "root";
private static final String PASSWORD = "123456";
private static final String EXCHANGE_NAME = "direct_logs";
// Severities this consumer cares about
private static final String KEY_WARN = "warn";
private static final String KEY_INFO = "info";
public static void main(String[] args) throws IOException, TimeoutException {
// 1) Configure connection parameters
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST);
factory.setPort(PORT);
factory.setVirtualHost(VHOST);
factory.setUsername(USERNAME);
factory.setPassword(PASSWORD);
// 2) Establish connection and Channel (consumer usually stays resident, doesn't actively close)
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/**
* 3) Declare direct exchange
* Consistent with producer: direct_logs + DIRECT
*/
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
/**
* 4) Declare temporary queue (anonymous queue)
* - Random queue name assigned by broker
* - exclusive + autoDelete, suitable for demo/temporary subscription
*/
String queueName = channel.queueDeclare().getQueue();
/**
* 5) Bind queue to exchange (bind multiple bindingKeys)
* One queue can bind same exchange multiple times, one key each:
* - Receive warn
* - Receive info
*
* Result: This queue receives messages from both routingKey types.
*/
channel.queueBind(queueName, EXCHANGE_NAME, KEY_WARN);
channel.queueBind(queueName, EXCHANGE_NAME, KEY_INFO);
/**
* 6) Message callback
* - routingKey: print what category this message belongs to (warn/info)
* - body: UTF-8 decode
*/
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String routingKey = delivery.getEnvelope().getRoutingKey();
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
};
/**
* 7) Start consuming
* autoAck=true: message acknowledged on delivery, callback exception/process crash may cause message loss risk.
*/
boolean autoAck = true;
channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {});
// Process stays running to continue consuming
}
}
Key Points Summary
| Feature | Description |
|---|---|
| Routing Method | Direct Exchange - Exact Match |
| Matching Rules | routingKey == bindingKey (exact match) |
| Multi-binding | One queue can bind multiple bindingKeys |
| Broadcast Behavior | Multiple queues binding same key, similar to fanout broadcast |
| Applicable Scenarios | Log classification, order distribution, message classification processing |