RabbitMQ Tutorials (Routing)

Routing(路由)

上一节,我们创建了一个日志系统,我们能够分发log信息给每个订阅者。

这一节,我们在其上添加额外的功能——只订阅log信息的一个子集。例如:我们只把至关重要的错误日志信息,记录到文件,而所有的日志信息都可以在控制带输出。

Bindings

上一节,我们已经定义了绑定信息,你可以回想一下代码:

1
channel.queueBind(queueName, EXCHANGE_NAME, "");

binding是exchange与queue之间的关系,简单的来说就是:queue对来自指定的exchange的消息感兴趣。

Binding可以指定routingKey参数。为了避免和BasicPublish参数疑惑,我们可以叫它 binding key。因此我们可以创建一个带key的bingding。

1
channel.queueBind(queueName, EXCHANGE_NAME, "black");

bingding key的意义取决于 exchange的类型。fanout类型的exchange会忽略这个值。

Direct exchange

上一节的日志系统只能把素有的消息广播给所有的消费者。我们想根据message的log lever来过滤message。例如,我们只想把错误的日志信息写到磁盘里,警告和信息日志不写到磁盘里。

但是fanout类型exchange 不够灵活,它只能盲目的进行广播。

因此这里我们使用 direct类型的exchange来替代。direct 类型exchange背后的算法很简单——一个消息只会发送给queue的bingding key 完全匹配message的routing key的队列。

大体结构如下所示:

我们看到 direct类型的exchange X 有两个queue绑定到它。第一个 bingding key是orange。第二个有两个bingding Key:black和green。

因此,如果一个message的routing key是orange会发送给Q1队列,如果是blcak或green则会发送给Q2,其他的消息则会被丢弃掉。

Multiple bindings

多个队列绑定同样的key是合法的。我们可以给Q1绑定一个black的key,在这种情况下,direct 类型的exchage的行为和fanout表现的一样。一个消息将会路由给Q1和Q2。

Emitting logs

我们将使用这个模型作为我们的日志系统。我们通过使用一个direct exchange类型来发送消息,代替之前的fanout类型。我们通过日志的级别来作为routing key。订阅者可以通过选择severity来得到他们想要接收的信息。我们首先关注emitting logs。

和之前一样,我们需要先创建一个交换机:

1
channel.exchangeDeclare(EXCHANGE_NAME, "direct");

接着我们发送一条消息:

1
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

为了简化,我们的serverity 可以是”info”,”warning”,”error”。

Subscribing(订阅)

接收消息的工作和之前的教程一样,只有一处不一样-我们会循环severity来创建绑定信息。

1
2
3
4
5
String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

Putting it all together

EmitLogDirect.java 类的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class EmitLogDirect {

private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] argv)
throws java.io.IOException {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

String severity = getSeverity(argv);
String message = getMessage(argv);

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

channel.close();
connection.close();
}
//..
}

ReceiveLogsDirect.java类的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogsDirect {

private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();

if (argv.length < 1){
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}

for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}

编译还是和以前一样,为了方便我们使用环境变量$CP(window使用%CP%)作为例子的运行路径:

如果你只想保存 ‘warning’和’error’(不需要’info’)日志信息到文件中,打开控制台并制定类型:

1
$ java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log

如果你想在你的屏幕上查看所有的日志信息,打开一个新的终端输入以下信息:

1
2
$ java -cp $CP ReceiveLogsDirect info warning error
[*] Waiting for logs. To exit press CTRL+C

我们发送一个错误日志信息试试:

1
2
$ java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
[x] Sent 'error':'Run. Run. Or it will explode.'

(完整的代码 (EmitLogDirect.java source) and (ReceiveLogsDirect.java source))