RabbitMQ Tutorials (Publish/Subscribe)
发布/订阅
在上一节我们创建了一个工作队列。工作队列会按顺序分配给每一个工作者。在这一节里我们会把一条消息给多个消费者。这种模式叫做”发布/订阅”。
为了阐明这种模式,我们建立一个日志系统。它包含两个程序 – 一个发出日志消息,另一个接收并打印日志消息。
在这个日志系统中,每一个receiver程序都会获得日志信息。因此我们可以设置一个接收者直接保存日志信息到硬盘,另一个接收者打印日志信息到屏幕。
就是说,发布的日志消息会广播给所有的接收者。
Exchanges(交换机)
上一节我们从一个队列里发送和接收消息。现在我们完整的讲解Rabbit的消息发送模型。
我们快速的回顾一下之前的接收流程:
- 一个生产者通过程序发送消息
- 一个队列作为缓冲区存储消息
- 一个消费者通过程序接收消息
RabbitMQ的消息模型核心思想是生产者不会直接向队列发送任何消息。实际上,生产者甚至不知道一个消息将传递给哪个队列。
事实上,生产者只能发送消息给一个exchange。exchange 很简单。一方面它接收来自生产者的消息,另一方面它把消息推送给队列。因此exchange要知道怎么处理接收到的message。是把message发给一个特定的队列?还是发给多个队列?或者丢弃?这个规则是由 exchange type 定义的。
exchange 有以下几种:direct, topic, headers 和 fanout。我们主要使用最后一种——fanout。下面我们定义一个 名字叫logs,类型为fanout的exchange:
1 | channel.exchangeDeclare("logs", "fanout"); |
fanout exchange很简单。它会广播所有收到的message传递给它知道的queue。这正是我们需要的日志记录方式。
Listing exchanges
你可以在服务器上面使用rabbitmqctl命令查看所有的交换机信息:
1 | $ sudo rabbitmqctl list_exchanges |
In this list there are some amq.* exchanges and the default (unnamed) exchange. These are created by default, but it is unlikely you’ll need to use them at the moment.
这个列表信息里有一些amq.*开头的是默认的交换机。他们默认就被创建好了。
Nameless exchange
在之前章节的教程中,我们不知道exchange,但是我们仍然能够把message传递给queue,这是因为我们使用了默认的exchange。我们使用了空字符串作为交换机信息。
之前我们发布message,用的代码如下所示:
1 | channel.basicPublish("", "hello", null, message.getBytes()); |
第一参数代表exchange的名字,这里使用空字符串表示用默认的或nameless exchange。如果队列存在的话,message会被发送给routingKey指定的队列。
接下来,我们把它替换成发布到我们命名的exchange:
1 | channel.basicPublish( "logs", "", null, message.getBytes()); |
Temporary queues(临时队列)
之前我们使用的queue都有一个特定的名字(hello 或者 task_queue?)。当你想在生产者和消费者之间共享队列,给queue命名是至关重要的。
但是 这种情况对于我们的日志系统是不适合的。我们想看到所有的日志信息,而不是仅仅是他的一个子集。我们也只对当前流动的感兴趣而不是旧的消息。为了解决这个问题我们需要做两件事:
首先,我们需要一个新的,空的队列,不管什么时候我们连接到Rabbit。这就需要,我们每次连接rabbti都要创建一个名字随机的队列,或者让服务器选择一个名字随机的队列给我们。
其次,一旦我们consumer断开与queue的连接,queue应该自动删除。
在Java client 我们提供了一个无参的 queueDeclare()方法,使用它,我们可以创建一个 不持久化,名字唯一,自动删除的队列。
1 | String queueName = channel.queueDeclare().getQueue(); |
这里queueName是随机生成的队列的名字。例如amq.gen-JzTY20BRgKO-HjmUJj0wLg.
Bindings(绑定)
我们已经创建了一个 fanout类型的exchange和所需的queue。现在,我们就需要告诉 exchange 发送messages 到我们指定的 queue。这里,exchange和queue的关系我们叫做binding(绑定)。
1 | channel.queueBind(queueName, "logs", ""); |
现在这个logs exchange将把消息添加到我们指定的队列中。
Listing bindings
你可以通过rabbitmqctl list_bindings命令,查看已经存在的bingding。
Putting it all together
这个生产程序里,发送日志消息,和之前的教程有所区别。主要的变化是我们通过指定的logs exchange去发布消息。以前我们需要通过一个routingKey去发送,这里我们通过fanout exchanges忽略了这个值。这里是EmitLog.java的程序:
1 | import java.io.IOException; |
正如你看到的,在建立连接后我们声明了一个交换机。这一步是必须的,禁止发布消息到一个不存在的交换机中。
如果交换机没有和队列进行绑定,消息将会丢失,但是这种情况是允许的;如果没有消费者监听,我们可以安全的丢弃消息。
ReceiveLogs.java的代码:
1 | import com.rabbitmq.client.*; |
编译我们写的代码。
1 | $ javac -cp .:rabbitmq-client.jar EmitLog.java ReceiveLogs.java |
如果我们想保存日志信息到文件,只需要打开控制台指定ReceiveLogs类型:
1 | $ java -cp .:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log |
如果你想在屏幕上查看日志信息,新打开一个终端运行:
1 | $ java -cp .:rabbitmq-client.jar ReceiveLogs |
最后,发送日志:
1 | $ java -cp .:rabbitmq-client.jar EmitLog |
使用rabbitmqctl list_bindings命令可以验证创建的bindings and queues信息,有2个ReceiveLogs.java programs 运行中,你会看到这样的信息:
1 | $ sudo rabbitmqctl list_bindings |
结果很简单,数据从logs交换机传递给了两个队列,这正是我们想要的结果。