RabbitMQ Tutorials (Publish/Subscribe)

发布/订阅

在上一节我们创建了一个工作队列。工作队列会按顺序分配给每一个工作者。在这一节里我们会把一条消息给多个消费者。这种模式叫做”发布/订阅”。

为了阐明这种模式,我们建立一个日志系统。它包含两个程序 – 一个发出日志消息,另一个接收并打印日志消息。

在这个日志系统中,每一个receiver程序都会获得日志信息。因此我们可以设置一个接收者直接保存日志信息到硬盘,另一个接收者打印日志信息到屏幕。

就是说,发布的日志消息会广播给所有的接收者。

Exchanges(交换机)

上一节我们从一个队列里发送和接收消息。现在我们完整的讲解Rabbit的消息发送模型。
我们快速的回顾一下之前的接收流程:

  • 一个生产者通过程序发送消息
  • 一个队列作为缓冲区存储消息
  • 一个消费者通过程序接收消息

RabbitMQ的消息模型核心思想是生产者不会直接向队列发送任何消息。实际上,生产者甚至不知道一个消息将传递给哪个队列。

事实上,生产者只能发送消息给一个exchange。exchange 很简单。一方面它接收来自生产者的消息,另一方面它把消息推送给队列。因此exchange要知道怎么处理接收到的message。是把message发给一个特定的队列?还是发给多个队列?或者丢弃?这个规则是由 exchange type 定义的。

exchange 有以下几种:direct, topic, headers 和 fanout。我们主要使用最后一种——fanout。下面我们定义一个 名字叫logs,类型为fanout的exchange:

1
channel.exchangeDeclare("logs", "fanout");

fanout exchange很简单。它会广播所有收到的message传递给它知道的queue。这正是我们需要的日志记录方式。

Listing exchanges
你可以在服务器上面使用rabbitmqctl命令查看所有的交换机信息:

1
2
3
4
5
6
7
8
9
10
11
12
$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
logs fanout
...done.

In this list there are some amq.* exchanges and the default (unnamed) exchange. These are created by default, but it is unlikely you’ll need to use them at the moment.
这个列表信息里有一些amq.*开头的是默认的交换机。他们默认就被创建好了。

Nameless exchange
在之前章节的教程中,我们不知道exchange,但是我们仍然能够把message传递给queue,这是因为我们使用了默认的exchange。我们使用了空字符串作为交换机信息。

之前我们发布message,用的代码如下所示:

1
channel.basicPublish("", "hello", null, message.getBytes());

第一参数代表exchange的名字,这里使用空字符串表示用默认的或nameless exchange。如果队列存在的话,message会被发送给routingKey指定的队列。

接下来,我们把它替换成发布到我们命名的exchange:

1
channel.basicPublish( "logs", "", null, message.getBytes());

Temporary queues(临时队列)

之前我们使用的queue都有一个特定的名字(hello 或者 task_queue?)。当你想在生产者和消费者之间共享队列,给queue命名是至关重要的。

但是 这种情况对于我们的日志系统是不适合的。我们想看到所有的日志信息,而不是仅仅是他的一个子集。我们也只对当前流动的感兴趣而不是旧的消息。为了解决这个问题我们需要做两件事:

首先,我们需要一个新的,空的队列,不管什么时候我们连接到Rabbit。这就需要,我们每次连接rabbti都要创建一个名字随机的队列,或者让服务器选择一个名字随机的队列给我们。

其次,一旦我们consumer断开与queue的连接,queue应该自动删除。

在Java client 我们提供了一个无参的 queueDeclare()方法,使用它,我们可以创建一个 不持久化,名字唯一,自动删除的队列。

1
String queueName = channel.queueDeclare().getQueue();

这里queueName是随机生成的队列的名字。例如amq.gen-JzTY20BRgKO-HjmUJj0wLg.

Bindings(绑定)

我们已经创建了一个 fanout类型的exchange和所需的queue。现在,我们就需要告诉 exchange 发送messages 到我们指定的 queue。这里,exchange和queue的关系我们叫做binding(绑定)。

1
channel.queueBind(queueName, "logs", "");

现在这个logs exchange将把消息添加到我们指定的队列中。

Listing bindings

你可以通过rabbitmqctl list_bindings命令,查看已经存在的bingding。

Putting it all together

这个生产程序里,发送日志消息,和之前的教程有所区别。主要的变化是我们通过指定的logs exchange去发布消息。以前我们需要通过一个routingKey去发送,这里我们通过fanout exchanges忽略了这个值。这里是EmitLog.java的程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

private static final String EXCHANGE_NAME = "logs";

public static void main(String[] argv)
throws java.io.IOException {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

String message = getMessage(argv);

channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

channel.close();
connection.close();
}
//...
}

(EmitLog.java source)

正如你看到的,在建立连接后我们声明了一个交换机。这一步是必须的,禁止发布消息到一个不存在的交换机中。

如果交换机没有和队列进行绑定,消息将会丢失,但是这种情况是允许的;如果没有消费者监听,我们可以安全的丢弃消息。

ReceiveLogs.java的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}

(ReceiveLogs.java source)

编译我们写的代码。

1
$ javac -cp .:rabbitmq-client.jar EmitLog.java ReceiveLogs.java

如果我们想保存日志信息到文件,只需要打开控制台指定ReceiveLogs类型:

1
$ java -cp .:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log

如果你想在屏幕上查看日志信息,新打开一个终端运行:

1
$ java -cp .:rabbitmq-client.jar ReceiveLogs

最后,发送日志:

1
$ java -cp .:rabbitmq-client.jar EmitLog

使用rabbitmqctl list_bindings命令可以验证创建的bindings and queues信息,有2个ReceiveLogs.java programs 运行中,你会看到这样的信息:

1
2
3
4
5
$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
...done.

结果很简单,数据从logs交换机传递给了两个队列,这正是我们想要的结果。