RabbitMQ Tutorials (Hello World)
介绍
RabbitMQ是一个消息代理。它的核心原理非常简单:接收和发送消息。你可以把它想像成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ是一个邮箱、邮局、邮递员。
RabbitMQ和邮局的主要区别是,它处理的不是纸,而是接收、存储和发送二进制的数据——消息。
一般提到RabbitMQ和消息,都用到一些专有名词。
- 生产(Producing)意思就是发送。发送消息的程序就是一个生产者(producer)。我们一般用”P”来表示:
- 队列(queue)就是邮箱的名称。消息通过你的应用程序和RabbitMQ进行传输,它们能够只存储在一个队列(queue)中。队列(queue)没有任何限制,你要存储多少消息都可以——基本上是一个无限的缓冲。多个生产者(producers)能够把消息发送给同一个队列,同样,多个消费者(consumers)也能够从一个队列(queue)中获取数据。队列可以用下图标识:
- 消费(Consuming)和获取消息是一样的意思。一个消费者(consumer)就是一个等待获取消息的程序。我们把它画作”C”:
注意:一般生产者,消费者和代理不必部署在同一台机子上。
“Hello World”
(using the Java Client)
我们将在这个章节里创建2个java程序;一个生产者发送一条消息,一个消费者接受消息并且打印输出。我们跳过Java API的细节,从最简单的事情开始说起。通过一条 “Hello World”作为消息。
在下面的图表中, “P” 是我们的生产者,”C”是我们的消费者。中间是一个队列 - 作为RabbitMQ的消息缓冲区提供给消费者。
The Java client library
RabbitMQ 支持多种协议。该指南使用的是AMQP 0-9-1协议,这是一个开放的,通用的消息协议。RabbitMQ 提供了许多客户端语言的支持。我们将使用Java client 提供商。
下载客户端依赖包,检查签名信息。解压到你的工作目录:
1 | $ unzip rabbitmq-java-client-bin-*.zip |
(这个客户端你也可以从Maven的中心仓库去下载,groupId= com.rabbitmq,artifactId=amqp-client)
现在我们有了客户端的依赖,就可以编写一些代码了。
Sending 发送端
我们将执行我们的消息发送端发送给我们的消息接收端。发送者将连接到RabbitMQ,发送一条消息,然后退出。
在Send.java文件中,我们需要引入一些依赖:
1 | import com.rabbitmq.client.ConnectionFactory; |
在类里面设置队列的名称:
1 | public class Send { |
我们创建一个连接到服务器:
1 | ConnectionFactory factory = new ConnectionFactory(); |
这里创建了一个Socket连接,负责协议版本协商和身份验证等等,这里由于连接的是本地机子,所以取值localhost。如果我们想连接到其他机器的代理服务,我们只需要添加IP地址就可以了。
接下来,创建一个channel(通道),大多数任务都是在这里完成的。
要发送消息,我们必须首先定义一个queue(队列),然后我们才成把消息发送给queue:
1 | channel.queueDeclare(QUEUE_NAME, false, false, false, null); |
queue 的定义具有幂等性,如果不存在就会被创建。消息的内容是一个字节数组,所以你可以对内容进行编码。
最后,我们关闭通道和连接。
1 | channel.close(); |
整个Send.java类在这里查看。
Sending doesn’t work!
发送端不工作的情况
如果你第一次使用RabbitMQ并且没有看到发送的消息,你可以不知道是什么原因导致的错误,也许是没有足够的磁盘空间(默认需要1Gb的空间).配置文档会告诉你怎么设置disk_free_limit。
Receiving 接收者
我们的接收者会从RabbitMQ拉取消息,不像发送者发送消息那样,接收端会保持监听消息,然后打印输出。
在Recv.java里也需要引入一些依赖:
1 | import com.rabbitmq.client.ConnectionFactory; |
DefaultConsumer类实现了Consumer接口,从服务端拉取消息。
设置上和发送端一样;打开一个连接和一个通道,并且声明一个队列进行消费。这里要和发送端的队列保持一致。
1 | public class Recv { |
也许你已经发现,我们在接收端也定义了hello队列。这是为了确保,如果接收端先启动的时候,队列已经存在。
接下来,我们就要告诉服务器来交付消息。由于推送消息是一个异步操作,因此我们使用回调函数,DefaultConsumer.handleDelivery来处理。
1 | Consumer consumer = new DefaultConsumer(channel) { |
整个Recv.java类在这里查看。
Putting it all together
通过javac对2个文件进行编译操作:
1 | $ javac -cp rabbitmq-client.jar Send.java Recv.java |
然后运行他们,你需要将rabbitmq-client.jar放到classpath下,在一个终端里运行发送端:
1 | $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send |
然后运行接收端:
1 | $ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv |
接收端从RabbitMQ获取消息并打印,接收端会一直保持运行,等待消息到来(使用Ctrl-C 停止运行),你可以尝试运行发送端。
如果你想检查这个队列,使用 rabbitmqctl list_queues.
hello
移动到第二章节怎么构建一个简单的工作队列。