RabbitMQ Tutorials (Routing)
Routing(路由)
上一节,我们创建了一个日志系统,我们能够分发log信息给每个订阅者。
这一节,我们在其上添加额外的功能——只订阅log信息的一个子集。例如:我们只把至关重要的错误日志信息,记录到文件,而所有的日志信息都可以在控制带输出。
Bindings
上一节,我们已经定义了绑定信息,你可以回想一下代码:
1 | channel.queueBind(queueName, EXCHANGE_NAME, ""); |
binding是exchange与queue之间的关系,简单的来说就是:queue对来自指定的exchange的消息感兴趣。
Binding可以指定routingKey参数。为了避免和BasicPublish参数疑惑,我们可以叫它 binding key。因此我们可以创建一个带key的bingding。
1 | channel.queueBind(queueName, EXCHANGE_NAME, "black"); |
bingding key的意义取决于 exchange的类型。fanout类型的exchange会忽略这个值。
Direct exchange
上一节的日志系统只能把素有的消息广播给所有的消费者。我们想根据message的log lever来过滤message。例如,我们只想把错误的日志信息写到磁盘里,警告和信息日志不写到磁盘里。
但是fanout类型exchange 不够灵活,它只能盲目的进行广播。
因此这里我们使用 direct类型的exchange来替代。direct 类型exchange背后的算法很简单——一个消息只会发送给queue的bingding key 完全匹配message的routing key的队列。
大体结构如下所示:
我们看到 direct类型的exchange X 有两个queue绑定到它。第一个 bingding key是orange。第二个有两个bingding Key:black和green。
因此,如果一个message的routing key是orange会发送给Q1队列,如果是blcak或green则会发送给Q2,其他的消息则会被丢弃掉。
Multiple bindings
多个队列绑定同样的key是合法的。我们可以给Q1绑定一个black的key,在这种情况下,direct 类型的exchage的行为和fanout表现的一样。一个消息将会路由给Q1和Q2。
Emitting logs
我们将使用这个模型作为我们的日志系统。我们通过使用一个direct exchange类型来发送消息,代替之前的fanout类型。我们通过日志的级别来作为routing key。订阅者可以通过选择severity来得到他们想要接收的信息。我们首先关注emitting logs。
和之前一样,我们需要先创建一个交换机:
1 | channel.exchangeDeclare(EXCHANGE_NAME, "direct"); |
接着我们发送一条消息:
1 | channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); |
为了简化,我们的serverity 可以是”info”,”warning”,”error”。
Subscribing(订阅)
接收消息的工作和之前的教程一样,只有一处不一样-我们会循环severity来创建绑定信息。
1 | String queueName = channel.queueDeclare().getQueue(); |
Putting it all together
EmitLogDirect.java 类的代码:
1 | public class EmitLogDirect { |
ReceiveLogsDirect.java类的代码:
1 | import com.rabbitmq.client.*; |
编译还是和以前一样,为了方便我们使用环境变量$CP(window使用%CP%)作为例子的运行路径:
如果你只想保存 ‘warning’和’error’(不需要’info’)日志信息到文件中,打开控制台并制定类型:
1 | $ java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log |
如果你想在你的屏幕上查看所有的日志信息,打开一个新的终端输入以下信息:
1 | $ java -cp $CP ReceiveLogsDirect info warning error |
我们发送一个错误日志信息试试:
1 | $ java -cp $CP EmitLogDirect error "Run. Run. Or it will explode." |
(完整的代码 (EmitLogDirect.java source) and (ReceiveLogsDirect.java source))