RabbitMQ Tutorials (Hello World)

介绍

RabbitMQ是一个消息代理。它的核心原理非常简单:接收和发送消息。你可以把它想像成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ是一个邮箱、邮局、邮递员。

RabbitMQ和邮局的主要区别是,它处理的不是纸,而是接收、存储和发送二进制的数据——消息。

一般提到RabbitMQ和消息,都用到一些专有名词。

  • 生产(Producing)意思就是发送。发送消息的程序就是一个生产者(producer)。我们一般用”P”来表示:
    生产
  • 队列(queue)就是邮箱的名称。消息通过你的应用程序和RabbitMQ进行传输,它们能够只存储在一个队列(queue)中。队列(queue)没有任何限制,你要存储多少消息都可以——基本上是一个无限的缓冲。多个生产者(producers)能够把消息发送给同一个队列,同样,多个消费者(consumers)也能够从一个队列(queue)中获取数据。队列可以用下图标识:
    此处输入图片的描述
  • 消费(Consuming)和获取消息是一样的意思。一个消费者(consumer)就是一个等待获取消息的程序。我们把它画作”C”:
    消费

注意:一般生产者,消费者和代理不必部署在同一台机子上。

“Hello World”

(using the Java Client)
我们将在这个章节里创建2个java程序;一个生产者发送一条消息,一个消费者接受消息并且打印输出。我们跳过Java API的细节,从最简单的事情开始说起。通过一条 “Hello World”作为消息。

在下面的图表中, “P” 是我们的生产者,”C”是我们的消费者。中间是一个队列 - 作为RabbitMQ的消息缓冲区提供给消费者。

The Java client library

RabbitMQ 支持多种协议。该指南使用的是AMQP 0-9-1协议,这是一个开放的,通用的消息协议。RabbitMQ 提供了许多客户端语言的支持。我们将使用Java client 提供商。
下载客户端依赖包,检查签名信息。解压到你的工作目录:

1
2
$ unzip rabbitmq-java-client-bin-*.zip
$ cp rabbitmq-java-client-bin-*/*.jar ./

(这个客户端你也可以从Maven的中心仓库去下载,groupId= com.rabbitmq,artifactId=amqp-client)
现在我们有了客户端的依赖,就可以编写一些代码了。

Sending 发送端


我们将执行我们的消息发送端发送给我们的消息接收端。发送者将连接到RabbitMQ,发送一条消息,然后退出。
在Send.java文件中,我们需要引入一些依赖:

1
2
3
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

在类里面设置队列的名称:

1
2
3
4
5
6
7
8
public class Send {
private final static String QUEUE_NAME = "hello";

public static void main(String[] argv)
throws java.io.IOException {
...
}
}

我们创建一个连接到服务器:

1
2
3
4
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

这里创建了一个Socket连接,负责协议版本协商和身份验证等等,这里由于连接的是本地机子,所以取值localhost。如果我们想连接到其他机器的代理服务,我们只需要添加IP地址就可以了。

接下来,创建一个channel(通道),大多数任务都是在这里完成的。

要发送消息,我们必须首先定义一个queue(队列),然后我们才成把消息发送给queue:

1
2
3
4
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

queue 的定义具有幂等性,如果不存在就会被创建。消息的内容是一个字节数组,所以你可以对内容进行编码。

最后,我们关闭通道和连接。

1
2
channel.close();
connection.close();

整个Send.java类在这里查看。
Sending doesn’t work!
发送端不工作的情况
如果你第一次使用RabbitMQ并且没有看到发送的消息,你可以不知道是什么原因导致的错误,也许是没有足够的磁盘空间(默认需要1Gb的空间).配置文档会告诉你怎么设置disk_free_limit。

Receiving 接收者

我们的接收者会从RabbitMQ拉取消息,不像发送者发送消息那样,接收端会保持监听消息,然后打印输出。

在Recv.java里也需要引入一些依赖:

1
2
3
4
5
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

DefaultConsumer类实现了Consumer接口,从服务端拉取消息。

设置上和发送端一样;打开一个连接和一个通道,并且声明一个队列进行消费。这里要和发送端的队列保持一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Recv {
private final static String QUEUE_NAME = "hello";

public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
...
}
}

也许你已经发现,我们在接收端也定义了hello队列。这是为了确保,如果接收端先启动的时候,队列已经存在。

接下来,我们就要告诉服务器来交付消息。由于推送消息是一个异步操作,因此我们使用回调函数,DefaultConsumer.handleDelivery来处理。

1
2
3
4
5
6
7
8
9
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);

整个Recv.java类在这里查看。

Putting it all together

通过javac对2个文件进行编译操作:

1
$ javac -cp rabbitmq-client.jar Send.java Recv.java

然后运行他们,你需要将rabbitmq-client.jar放到classpath下,在一个终端里运行发送端:

1
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send

然后运行接收端:

1
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv

接收端从RabbitMQ获取消息并打印,接收端会一直保持运行,等待消息到来(使用Ctrl-C 停止运行),你可以尝试运行发送端。

如果你想检查这个队列,使用 rabbitmqctl list_queues.
hello
移动到第二章节怎么构建一个简单的工作队列。