服务下载

| Description | Download |
| ——| —— | :—-: |
| RPM for Fedora / RHEL / CentOS Linux (from rabbitmq.com) | rabbitmq-server-3.6.5-1.noarch.rpm |
| RPM for openSUSE Linux (from rabbitmq.com) | rabbitmq-server-3.6.5-1.suse.noarch.rpm |
| RPM for Fedora / RHEL / CentOS Linux (from github.com) | rabbitmq-server-3.6.5-1.noarch.rpm |

概述

rabbitmq-server已经包含在Fedora系统中,但是,这个版本往往比较旧。通过我们的站点安装.rpm格式的文件会获得更好的结果。检查Fedora 包详情获得服务器版本信息。

安装Erlang

在安装RabbitMQ之前,你必须安装Erlang环境。我们建议使用一个打包版本。这里有三种来源方式:

  • Erlang 方案 通常是最新的安装包,提供两种方式,一种通过yum 仓库进行安装,另一种通过手动下载安装。
  • 我们提供了一个适合运行RabbitMQ服务的依赖包,这个包不是最新的,但是能够很好的解决依赖问题。
  • EPEL (“Extra Packages for Enterprise Linux”); 有一部分的 Red Hat / Fedora 机构,提供了一些额外的包,包含了 Erlang. 他们都是正式的安装包,并被分成了很多个小包,但不是最新的。

从Erlang官网的仓库进行安装

  1. 可以通过使用仓库来安装Erlang。

或者下载单独的包进行安装

  1. 下载安装 esl-erlang RPM进行安装。
  2. 下载安装 esl-erlang-compat RPM兼容包。

或者从RabbitMQ安装零依赖的Erlang

  1. 下载安装零依赖的Erlang RPM 包.

或者从EPEL仓库安装Erlang

  1. 在你的机器上激活EPEL。
  2. 通过root账户执行以下命令:
    1
    yum install erlang

安装RabbitMQ服务

下载rpm文件,通过root账户执行以下命令:

1
2
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc
yum install rabbitmq-server-3.6.5-1.noarch.rpm

我们的公钥信息也可以从这里下载.

Using PackageCloud RPM Repository

PackageCloud installs packages via HTTPS and signs them using their GPG key. There are multiple ways to install:

Provided installation scripts
Using PackageCloud Chef cookbook
Using PackageCloud Puppet module
Manually
See PackageCloud RabbitMQ repository instructions.

运行RabbitMQ服务

自定义RabbitMQ环境变量
  服务启动采用的是默认配置,你可以自定义RabbitMQ环境变量,这里告诉你这样配置组件
启动服务
  The server is not started as a daemon by default when the RabbitMQ server package is installed. To start the daemon by default when the system boots, as an administrator run chkconfig rabbitmq-server on.这个服务在安装以后不会以守护进程运行。默认情况下守护进程会随着系统启动而运行,通过管理员账号运行以下命令:

1
chkconfig rabbitmq-server on

通过管理员账号可以启动和停止该服务:

1
/sbin/service rabbitmq-server stop/start/etc.

备注:这个服务作为系统用户运行,如果你改变了Mnesia数据库或者日志
,你必须确保这些文件是该用户的。

访问端口

SELinux and similar 机器也许会阻止RabbitMQ绑定端口。这种情况下,RabbitMQ将会启动失败。确保以下端口是被允许打开的:

1
2
3
4
5
4369 (epmd), 25672 (Erlang distribution)
5672, 5671 (AMQP 0-9-1 without and with TLS)
15672 (if management plugin is enabled)
61613, 61614 (if STOMP is enabled)
1883, 8883 (if MQTT is enabled)

可以配置RabbitMQ使用其他端口。

默认用户访问

这个服务创建了一个用户名guest密码为guest的用户。没有配置的客户端将采用这个用户。默认情况下这个凭证只能用于连接本机。

查看文档中访问控制信息。了解如何创建更多的用户、删除guest用户,或者允许guest用户进行远程访问。

管理服务

怎么停止服务或者检查它的状态等,你可以调用sbin/rabbitmqctl(作为运行rabbitmq-server的用户来操作)。如果没有服务在运行rabbitmqctl命令将报告节点不在线的信息。

  • 调用 rabbitmqctl stop 命令停止服务.
  • 调用 rabbitmqctl status 检查服务运行状态.

更多的信息查看rabbitmqctl

日志

服务输出的信息会发送到RABBITMQ_NODENAME.log文件中并保存在RABBITMQ_LOG_BASE 目录。附加的日志信息会写到RABBITMQ_NODENAME-sasl.log文件中。

控制系统的限制

RabbitMQ安装运行在生产环境的工作负载情况下需要对系统的内核参数限制进行调整,保证一个合适的数量来处理并发的连接和队列。这里主要设置打开文件的最大数量,可以使用这个命令 ulimit -n。许多操作系统作为一个消息代理来说这个默认值太低了(大部分的linux默认值都是1024)。我们推荐使用最少65536的文件描述符作为用户rabbitmq的生产环境,4096 基本上满足大多数开发工作负载。

这里有2个限制参数:最大允许打开的文件数(fs.file-max on Linux, kern.maxfilesperproc on OS X and FreeBSD)和per-user 限制数(ulimit -n)。前者必须高于后者。

在分布式系统里面,通过配置/etc/systemd/system/rabbitmq-server.service.d/limits.conf文件来控制,例如:

1
2
[Service]
LimitNOFILE=300000

还有种最简单的方法来限制每个用户,就是在启动服务之前调用以下命令:

1
ulimit -S -n 4096

这个软限制不能够操作硬限制(许多分布式环境中默认为4096)。这个硬限制也可以在/etc/security/limits.conf中修改。必须要激活pam_limits.so模块并重新登陆或者重启系统。

Note that limits cannot be changed for running OS processes.

For more information about controlling fs.file-max with sysctl, please refer to the excellent Riak guide on open file limit tuning.

验证限制

UI管理页面 在概述这个选项卡里显示了可用的文件描述符数量。

1
rabbitmqctl status

上面的信息也包含了同样的值。
下面的命令:

1
cat /proc/$RABBITMQ_BEAM_PROCESS_PID/limits

用于显示运行进程的有效限制数。$RABBITMQ_BEAM_PROCESS_PID 是系统的PID,作为在Erlang VM运行的RabbitMQ。通过rabbitmqctl status返回。

##配置管理工具
配置管理工具(比如:Chef, Puppet, BOSH)提供了对系统限制的调整。我们的开发工具指南里面列出了相关的模块和项目。

概述

一个RabbitMQ服务作为一个逻辑组或者几个Erlang节点,所有运行的Rabbitmq应用程序共享用户信息,虚拟主机,队列,交换器,绑定信息,运行参数。我们把引用的节点集合作为一个集群。

什么是复制?

对于在RabbitMQ服务中操作的所有数据和状态信息在所有的节点中都被复制,具有可靠性和伸缩性,满足ACID要求。但是存在一个例外是针对 message queue的,其默认是仅存在于创建它的那个 node 上面,尽管其同时对于所有其他 node 是可见和可达的。为了在 cluster 中的所有 node 上复制某个 queue 的内容,参考 high availability 相关文档(首先你可能需要一个可用的集群环境) 。

主机名解析要求

RabbitMQ的节点地址信息使用的是域名,短的或者完整的有效域名。因此集群中的所有成员都必须能够解析来自集群中节点的主机名信息,这样才能够在任何一台机子上使用命令行工具rabbitmqctl。

主机名解析可以使用任何一种标准的方式:

  1. DNS records
  2. Local host files (e.g. /etc/hosts)

In more restrictive environments, where DNS record or hosts file modification is restricted, impossible or undesired, Erlang VM can be configured to use alternative hostname resolution methods, such as an alternative DNS server, a local file, a non-standard hosts file location, or a mix of methods. Those methods can work in concert with the standard OS hostname resolution methods.

使用FQDN的话,在配置指南中查看RABBITMQ_USE_LONGNAME属性。

集群构成

集群可以通过以下的任何一种方式进行构成:

  1. 通过rabbitmqctl手动操作(比如在开发环境中)
  2. 配置文件中声明集群节点列表
  3. 使用rabbitmq-autocluster方式 (一个插件)

集群的组成可以动态的改变。所有 RabbitMQ broker 在最初启动时都是从单独的一个节点开始的。 这些 node 可以加入到同一个 cluster 中,之后还可以重新退回成单个节点运行。

故障处理

RabbitMQ服务允许容忍单个节点的失效。节点可以随时启动和停止,只要他们可以在停止的时候通知一个已知的群集成员节点。

RabbitMQ有几种方式去处理网络分区问题,主要面向一致性。集群环境主要被使用在LAN(局域网)环境。不建议在WAN(广域网)环境下运行集群环境。通过使用Shovel或者Federation插件可以很好的在WAN环境中连接服务。Note that Shovel and Federation are not equivalent to clustering.

磁盘和内存节点

一个节点可以是一个磁盘节点或者一个内存节点。(备注:磁盘之间可以进行相互转换)。在大多数情况下你希望你的所有节点都是磁盘节点;内存节点是在特殊情况下为了提升集群中队列,交换器,绑定等性能使用的,如果你有疑问的话,可以只使用磁盘节点。

集群操作示范

下面是一份建立和操控 RabbitMQ cluster 的示范。其中包括 3 台机器 - rabbit1,rabbit2,rabbit3.

我们假设用户能够登陆到这三台机器上,RabbitMQ也已经安装在机器上了,并且rabbitmq-server和rabbitmqctl脚本命令在用户的PATH环境变量中配置好了。

这个示例可以被修改来在单个主机上运行,详细的细节在后面说明。

节点(和CLI工具)怎么认证其他节点: 使用ErlangCookie方式

RabbitMQ节点和CLI工具(比如:rabbitmqctl)使用cookie值来确定 node 间是否允许相互通信,两个 node 能够相互通信的前提是他们必须拥有相同的 Erlang cookie值。 这个cookie是一个字母数字组成的字符串。它的长度可以是长的或者短的。
每个集群节点必须具有相同的cookie。

在RabbitMQ服务启动的时候Erlang虚拟机会自动的创建一个随机的cookie文件。并且把它复制到集群环境里的所有其他节点中。

在Unix系统中,这个cookie文件通常位于 /var/lib/rabbitmq/.erlang.cookie 或者 $HOME/.erlang.cookie.

在Windows系统中,这个文件路径在 C:\Users\Current User.erlang.cookie (%HOMEDRIVE% + %HOMEPATH%.erlang.cookie) 或者 C:\Documents and Settings\Current User.erlang.cookie, and C:\Windows.erlang.cookie for RabbitMQ Windows service. 如果windows服务启动了,那么cookie应该在两个地方都有。

另一种方法, 你可以使用在脚本命令 rabbitmq-server 和 rabbitmqctl 中使用选项 “ -setcookie cookie” 来设置cookie.

当cookie配置错误 (例如,不完全相同), RabbitMQ 会记录错误的日志信息像这样 “Connection attempt from disallowed node” and “Could not auto-cluster”.

启动独立节点

要想建立一个 Cluster ,你就必须对每一个已经存在的 RabbitMQ node 按照 cluster 配置的方式重新进行配置。所以第一步要做的就是在每一个 node 上都常规启动 RabbitMQ 服务:

1
2
3
rabbit1$ rabbitmq-server -detached
rabbit2$ rabbitmq-server -detached
rabbit3$ rabbitmq-server -detached

这样就创建了 3 个独立的 RabbitMQ broker ,在每一个 node 上面,可以通过 cluster_status 命令来确认:

1
2
3
4
5
6
7
8
9
10
11
12
rabbit1$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ...
[{nodes,[{disc,[rabbit@rabbit1]}]},{running_nodes,[rabbit@rabbit1]}]
...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ...
[{nodes,[{disc,[rabbit@rabbit2]}]},{running_nodes,[rabbit@rabbit2]}]
...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ...
[{nodes,[{disc,[rabbit@rabbit3]}]},{running_nodes,[rabbit@rabbit3]}]
...done.

通过 rabbitmq-server 脚本命令创建的 RabbitMQ broker 对应的 node 的名字是 rabbit@shorthostname 样式,其中 short node 名字在 Linux 下是小写字母形式(如 rabbit@rabbit1)。如果您是在 Windows 上使用 rabbitmq-server.bat 批处理来执行的上述命令,short node 名字会是大写字母形式(如 rabbit@RABBIT1)。所以, 当你要使用 node 名字时,要注意大小写的问题,因为匹配时要求完全一致。

创建集群

为了将我们创建的 3 个 node 连接成一个 cluster ,需要将其中两个 node(rabbit@rabbit2 和 rabbit@rabbit3)加入到第三个 node( rabbit@rabbit1)。

我们首先把rabbit@rabbit2加入到rabbit@rabbit1集群环境中。停止rabbit@rabbit2机器的RabbitMQ应用并且加入到rabbit@rabbit1集群中,然后重启RabbitMQ应用。注意:加入 cluster 的过程隐式包含了重置 node 的动作,即移除了当前 node 上之前存放的所的资源和数据。

1
2
3
4
5
6
rabbit2$ rabbitmqctl stop_app
Stopping node rabbit@rabbit2 ...done.
rabbit2$ rabbitmqctl join_cluster rabbit@rabbit1
Clustering node rabbit@rabbit2 with [rabbit@rabbit1] ...done.
rabbit2$ rabbitmqctl start_app
Starting node rabbit@rabbit2 ...done.

我们可以从 rabbit@rabbit1 或者 rabbit@rabbit2 上通过命令 cluster_status 看到两个 node 已经加入到同一个 cluster 中了:

1
2
3
4
5
6
7
8
9
10
rabbit1$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]},
{running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}]
...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]},
{running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}]
...done.

现在我们把rabbit@rabbit3加入到相同的集群环境中,步骤和上面的相同。这次我们将加入 rabbit2 所在的 cluster (其实也是 rabbit1 所在的 cluster)以证明在这种情况下通过哪一个 node 加入 cluster 都是一样的。即只要我们提供了处于某个 cluster 中的可被其他人访问的 node ,那么该 node 所在的 cluster 就可以被其他 node 加入。

1
2
3
4
5
6
rabbit3$ rabbitmqctl stop_app
Stopping node rabbit@rabbit3 ...done.
rabbit3$ rabbitmqctl join_cluster rabbit@rabbit2
Clustering node rabbit@rabbit3 with rabbit@rabbit2 ...done.
rabbit3$ rabbitmqctl start_app
Starting node rabbit@rabbit3 ...done.

我们可以从任意一个 node 上通过命令 cluster_status 看到三个 node 已经加入到同一个 cluster 中了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
rabbit1$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]},
{running_nodes,[rabbit@rabbit3,rabbit@rabbit2,rabbit@rabbit1]}]
...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]},
{running_nodes,[rabbit@rabbit3,rabbit@rabbit1,rabbit@rabbit2]}]
...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ...
[{nodes,[{disc,[rabbit@rabbit3,rabbit@rabbit2,rabbit@rabbit1]}]},
{running_nodes,[rabbit@rabbit2,rabbit@rabbit1,rabbit@rabbit3]}]
...done.

按照上面的步骤,我们可以在任意时间添加新的 node 到 cluster 中,只要 cluster 处于运行状态。

重启集群节点

cluster 中的 node 在任何时候都可以被停止。 同样地如果他们崩溃了也是没有任何问题的。在上述两种情况中,cluster 中的其他 node 都可以不受任何影响的继续运行,这些“非正常” node 重新启动后会自动地与 cluster 中的其他 node 取得联系。

我们手动关闭 rabbit@rabbit1 和 rabbit@rabbit3 后,通过命令查看 cluster 的状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
rabbit1$ rabbitmqctl stop
Stopping and halting node rabbit@rabbit1 ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]},
{running_nodes,[rabbit@rabbit3,rabbit@rabbit2]}]
...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]},
{running_nodes,[rabbit@rabbit2,rabbit@rabbit3]}]
...done.
rabbit3$ rabbitmqctl stop
Stopping and halting node rabbit@rabbit3 ...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]},
{running_nodes,[rabbit@rabbit2]}]
...done.

现在我们重新启动 node ,并查看 cluster 的状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
rabbit1$ rabbitmq-server -detached
rabbit1$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]},
{running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}]
...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]},
{running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}]
...done.
rabbit3$ rabbitmq-server -detached
rabbit1$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]},
{running_nodes,[rabbit@rabbit2,rabbit@rabbit1,rabbit@rabbit3]}]
...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]},
{running_nodes,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]
...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2,rabbit@rabbit3]}]},
{running_nodes,[rabbit@rabbit2,rabbit@rabbit1,rabbit@rabbit3]}]
...done.

有一些重要的注意事项:

  • 当整个 cluster 不能工作了,最后一个失效的 node 必须是第一个重新开始工作的那一个。如果这种情况得不到满足,所有 node 将会为最后一个磁盘 node 的恢复等待 30 秒。如果最后一个离线的 node 无法重新上线,我们可以通过命令 forget_cluster_node 将其从 cluster 中移除 - 具体参考 rabbitmqctl 的使用手册。
  • 如果所有的节点在不受控的情况下停止(比如断电),在这种情况下,可以使用force_boot命令使它再次启动 - 具体参考 rabbitmqctl 的使用手册。

拆分集群

当 node 不应该继续存在于一个 cluster 中时,我们需要显式的将这些 node 移除。我们首先从 cluster 中移除 rabbit@rabbit3 ,将其还原为独立运行状态。具体做法为,在 rabbit@rabbit3 上先停止 RabbitMQ 应用,再重置 node ,最后重新启动RabbitMQ 应用。

1
2
3
4
5
6
rabbit3$ rabbitmqctl stop_app
Stopping node rabbit@rabbit3 ...done.
rabbit3$ rabbitmqctl reset
Resetting node rabbit@rabbit3 ...done.
rabbit3$ rabbitmqctl start_app
Starting node rabbit@rabbit3 ...done.

值得注意的是,此时仍旧可以通过 list 命令发现 rabbit@rabbit3 仍然作为 node 显示出来。

在 node 上运行 cluster_status 命令可以发现 rabbit@rabbit3 已经不再是 cluster 中的一员,且已经处于独立运行状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
rabbit1$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]},
{running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}]
...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ...
[{nodes,[{disc,[rabbit@rabbit1,rabbit@rabbit2]}]},
{running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}]
...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ...
[{nodes,[{disc,[rabbit@rabbit3]}]},{running_nodes,[rabbit@rabbit3]}]
...done.

我们还可以利用远端移除 node 的操作,这在有些情况下是很有用的,比如对无任何反应的 node 的 处理 。例如,我们可以在 rabbit@rabbit2 上执行移除 rabbit@rabbit1 的操作。

1
2
3
4
5
rabbit1$ rabbitmqctl stop_app
Stopping node rabbit@rabbit1 ...done.
rabbit2$ rabbitmqctl forget_cluster_node rabbit@rabbit1
Removing node rabbit@rabbit1 from cluster ...
...done.

注意到,rabbit1 仍旧会认为自己与 rabbit2 处于同一个 cluster 中,但是此时在 rabbit1 上执行 start_app 操作会提示相应错误信息。我们可以将 rabbit1 重置后让它重新运行。

1
2
3
4
5
6
7
8
rabbit1$ rabbitmqctl start_app
Starting node rabbit@rabbit1 ...
Error: inconsistent_cluster: Node rabbit@rabbit1 thinks it's clustered with node rabbit@rabbit2, but rabbit@rabbit2 disagrees
rabbit1$ rabbitmqctl reset
Resetting node rabbit@rabbit1 ...done.
rabbit1$ rabbitmqctl start_app
Starting node rabbit@mcnulty ...
...done.

此时执行 cluster_status 命令可以显示出当前所有 3 个 node 均是作为独立的 RabbitMQ broker 处于运行状态:

1
2
3
4
5
6
7
8
9
10
11
12
rabbit1$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ...
[{nodes,[{disc,[rabbit@rabbit1]}]},{running_nodes,[rabbit@rabbit1]}]
...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ...
[{nodes,[{disc,[rabbit@rabbit2]}]},{running_nodes,[rabbit@rabbit2]}]
...done.
rabbit3$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit3 ...
[{nodes,[{disc,[rabbit@rabbit3]}]},{running_nodes,[rabbit@rabbit3]}]
...done.

注意到 rabbit@rabbit2 会保有 cluster 的残余状态信息,而 rabbit@rabbit1 和 rabbit@rabbit3 却可以看成是新初始化的 RabbitMQ broker 。如果我们想要重新初始化 rabbit@rabbit2 ,我们可以按照下面的方式执行:

1
2
3
4
5
6
rabbit2$ rabbitmqctl stop_app
Stopping node rabbit@rabbit2 ...done.
rabbit2$ rabbitmqctl reset
Resetting node rabbit@rabbit2 ...done.
rabbit2$ rabbitmqctl start_app
Starting node rabbit@rabbit2 ...done.

升级集群

当RabbitMQ从一个版本升级到另一个版本的时候(比如:从3.0.x到3.1.x,或者从2.x.x到3.x.x),或者升级Erlang环境,整个集群必须停止后进行升级(集群不能够运行混合版本)。这里指的不是升级补丁版本的情况(比如:从3.0.x到3.0.y),除非在发行版本中有特殊说明,这些版本是可以在一个集群中混合使用。因此在升级之前强烈建议查看发行版本的说明。

Some patch releases known to require a cluster-wide restart:

  • 3.0.0 cannot be mixed with later versions from the 3.0.x series
  • 3.6.6 and later cannot be mixed with earlier versions from the 3.6.x series

当 RabbitMQ 从一个版本升级到另一个版本时,如果必要,RabbitMQ 会自动升级持久化数据结构。在 cluster 中,上述工作会由第一个被启动的磁盘 node 进行(即“负责升级的” node )。所以,当你升级一个 RabbitMQ cluster 的时候,不可以首先启动任何内存 node ,任何内存 node 的启动将产生一条错误消息并且启动失败。

尽管不是一定必要,但是建议你事先决定好使用哪个磁盘 node 作为升级点(upgrader),然后在升级过程中,最后停止那个 node ,最先启动那个 node 。否则,在 升级点 node 停止和最后停止的 node 之间所做的对于 cluster 配置的修改将会被丢失掉。

自动升级的功能仅在 RabbitMQ 2.1.1 和之后的版本中才具有。如果你使用了更早版本的 cluster ,你讲需要通过重新构建的方式来升级。

单机上的集群

在一些情况下,在单机上运行 RabbitMQ node 的 cluster 可能对你很有实用价值。其中之一是,你可以在你的台式机或者笔记本上运行 cluster 而不用额外跑多个虚拟机。

为了在单个主机上运行多个RabbitMQ节点,确保节点具有不同的节点名称,数据存储位置,日志文件位置,并且绑定到不同的端口,包含那些使用的插件,在配置指南中查看RABBITMQ_NODENAME, RABBITMQ_NODE_PORT, 和 RABBITMQ_DIST_PORT的介绍,以及在文件和目录位置指南中的查看RABBITMQ_MNESIA_DIR, RABBITMQ_CONFIG_FILE, and RABBITMQ_LOG_BASE描述。

你可以反复调用rabbitmq-server脚本在同一个主机上启动多个节点(Windows里面是rabbitmq-server.bat文件),比如:

1
2
3
4
5
$ RABBITMQ_NODE_PORT=5672 RABBITMQ_NODENAME=rabbit rabbitmq-server -detached
$ RABBITMQ_NODE_PORT=5673 RABBITMQ_NODENAME=hare rabbitmq-server -detached
$ rabbitmqctl -n hare stop_app
$ rabbitmqctl -n hare join_cluster rabbit@`hostname -s`
$ rabbitmqctl -n hare start_app

上述步骤将创建2个节点的集群,二者都是作为磁盘节点,注意 如果你想开放其他端口,你可以通过配置那些不冲突的端口运行,通过命令完成:

1
2
$ RABBITMQ_NODE_PORT=5672 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15672}]" RABBITMQ_NODENAME=rabbit rabbitmq-server -detached
$ RABBITMQ_NODE_PORT=5673 RABBITMQ_SERVER_START_ARGS="-rabbitmq_management listener [{port,15673}]" RABBITMQ_NODENAME=hare rabbitmq-server -detached

上述命令同样建立了两个 node 的 cluster ,但是使用了管理插件。

改变主机名

RabbitMQ 节点通过主机名来进行通信。因此,所有的节点必须能够解析来自集群中的其他节点。rabbitmqctl也是在这种情况使用。

除此之外,数据库目录使用当前的主机名在系统中使用。如果主机名发生了改变,一个新的空数据库将被创建。为了避免数据丢失创建一个固定的主机名是很重要的。当主机名发生了改变你需要重启RabbitMQ:

1
$ /etc/init.d/rabbitmq-server restart

A similar effect can be achieved by using rabbit@localhost as the broker nodename. The impact of this solution is that clustering will not work, because the chosen hostname will not resolve to a routable address from remote hosts. The rabbitmqctl command will similarly fail when invoked from a remote host. A more sophisticated solution that does not suffer from this weakness is to use DNS, e.g. Amazon Route 53 if running on EC2. If you want to use the full hostname for your nodename (RabbitMQ defaults to the short name), and that full hostname is resolveable using DNS, you may want to investigate setting the environment variable RABBITMQ_USE_LONGNAME=true.

查看hostname resolution guide获取更多的信息。

防火墙后的节点

这种情况是指数据中心或者可靠网络上的 cluster 中的 node 彼此之间存在防火墙的情况。再一次重申,不建议在 WAN 或者 node 之间的网络连接不可靠的情况下创建 cluster 。

在最常见的配置中,您将需要打开多个标准端口:

  1. 4369 (epmd)
  2. 5672, 5671 (AMQP 0-9-1 and 1.0 without and with TLS)
    1. This port used by Erlang distribution for inter-node and CLI tools communication and is allocated from a dynamic range (limited to a single port by default, computed as AMQP port + 20000). See networking guide for details.
  3. 15672 (if management plugin is enabled)
  4. 61613, 61614 (if STOMP is enabled)
  5. 1883, 8883 (if MQTT is enabled)

查看 RabbitMQ Networking guide获取更详细的信息.

集群中的Erlang版本

所有节点都必须运行在相同版本的erlang环境。

从客户端连接到集群

客户端可以透明地连接到 cluster 中的任意一个 node 上。 如果当前与客户端处于连接状态的那个 node 失效了,但是 cluster 中的其他 node 正常工作,那么客户端应该发现当前连接的关闭,然后应该可以重新连接到 cluster 中的其他正常的 node 上。一般来讲,将 node 的主机名或者 IP 地址 硬编码 到客户端应用程序中是非常不明智的:这会导致各种坑爹问题的出现,因为一旦 cluster 的配置改变或者 cluster 中的 ndoe 数目改变,客户端将面临重新编码、编译和重新发布的问题。作为替代,我们建议一种更加一般化的方式:采用 动态 DNS 服务 ,其具有非常短的 TTL 配置,或者 普通 TCP 负载均衡器 ,或者通过随机行走或者类似技术实现的某种形式的 mobile IP 。通常来讲,关于如何成功连接 cluster 中的 node 已经超出了 RabbitMQ 本身要说明的范畴,我们建议你使用其他的专门用于处理这方面问题的技术来解决这种问题。

带内存节点的集群

RAM nodes keep their metadata only in memory. As RAM nodes don’t have to write to disc as much as disc nodes, they can perform better. However, note that since persistent queue data is always stored on disc, the performance improvements will affect only resource management (e.g. adding/removing queues, exchanges, or vhosts), but not publishing or consuming speed.

RAM nodes are an advanced use case; when setting up your first cluster you should simply not use them. You should have enough disc nodes to handle your redundancy requirements, then if necessary add additional RAM nodes for scale.

A cluster containing only RAM nodes is fragile; if the cluster stops you will not be able to start it again and will lose all data. RabbitMQ will prevent the creation of a RAM-node-only cluster in many situations, but it can’t absolutely prevent it.

The examples here show a cluster with one disc and one RAM node for simplicity only; such a cluster is a poor design choice.

创建内存节点

我们可以声明一个内存节点加入到集群环境中,我们在使用rabbitmqctl join_cluster命令之,加上 –ram 标签即可:

1
2
3
4
5
6
rabbit2$ rabbitmqctl stop_app
Stopping node rabbit@rabbit2 ...done.
rabbit2$ rabbitmqctl join_cluster --ram rabbit@rabbit1
Clustering node rabbit@rabbit2 with [rabbit@rabbit1] ...done.
rabbit2$ rabbitmqctl start_app
Starting node rabbit@rabbit2 ...done.

内存节点在cluster status中显示效果如下:

1
2
3
4
5
6
7
8
9
10
rabbit1$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit1 ...
[{nodes,[{disc,[rabbit@rabbit1]},{ram,[rabbit@rabbit2]}]},
{running_nodes,[rabbit@rabbit2,rabbit@rabbit1]}]
...done.
rabbit2$ rabbitmqctl cluster_status
Cluster status of node rabbit@rabbit2 ...
[{nodes,[{disc,[rabbit@rabbit1]},{ram,[rabbit@rabbit2]}]},
{running_nodes,[rabbit@rabbit1,rabbit@rabbit2]}]
...done.

改变节点类型

我们可以改变 node 的类型,如磁盘 node 到内存 node ,或者相反。比如将 rabbit@rabbit2 和 rabbit@rabbit3 的 node 类型都变成和之前不同的种类。我们可以使用命令 change_cluster_node_type 来进行转换,但是首先需要将 node 停止。

1
2
3
4
5
6
7
8
9
10
11
12
rabbit2$ rabbitmqctl stop_app
Stopping node rabbit@rabbit2 ...done.
rabbit2$ rabbitmqctl change_cluster_node_type disc
Turning rabbit@rabbit2 into a disc node ...
...done.
Starting node rabbit@rabbit2 ...done.
rabbit1$ rabbitmqctl stop_app
Stopping node rabbit@rabbit1 ...done.
rabbit1$ rabbitmqctl change_cluster_node_type ram
Turning rabbit@rabbit1 into a ram node ...
rabbit1$ rabbitmqctl start_app
Starting node rabbit@rabbit1 ...done.

服务下载

| Description | Download |
| ——| —— | :—-: |
| Generic Unix release (tar.xz, from rabbitmq.com) | rabbitmq-server-generic-unix-3.6.5.tar.xz |

Generic Unix or Linux (BSD, Mac OS X)

服务安装

安装最新的Erlang版本。
从上面的连接下载rabbitmq-server-generic-unix-3.6.5.tar.xz。

在这个压缩文件中有一个目录名为rabbitmq_server-3.6.5。对该文件进行解压操作,我们在rabbitmq_server-3.6.5将会找到sbin目录。

运行RabbitMQ服务

启动服务
  调用sbin/rabbitmq-server shell脚本。这里会显示一条banner消息内容
,结束的时候会显示”completed with [n] plugins.”的内容,这表明RabbitMQ已经启动成功。
  你也可以在后台运行服务,使用命令rabbitmq-server -detached,这种情况下服务进程会在后台运行。
配置服务
你可以在$RABBITMQ_HOME/etc/rabbitmq/rabbitmq-env.conf文件中设置RabbitMQ的环境变量信息。服务器组件也能够配置,这个RabbitMQ配置文件路径在$RABBITMQ_HOME/etc/rabbitmq/rabbitmq.config。安装后这两个文件本身是不存在的。

文件位置

这个通用的Unix版本被设计成可以直接运行,不需要改变配置或者指定权限。这个目录和文件信息默认都是安装在rabbitmq_server-3.6.5目录中,在脚本文件中使用$RABBITMQ_HOME环境变量来代替这个路径。

如果你希望安装的RabbitMQ服务是使用传统的系统目录方式来管理配置,数据库,日志文件,插件配置等,这也是允许的。

找到这行配置信息:

1
SYS_PREFIX=${RABBITMQ_HOME}

在sbin/rabbitmq-defaults文件中去改变这行:

1
SYS_PREFIX=

不要在这个文件中修改其他行的内容。

备注:在修改了这个默认的目录后也许需要不同的权限才能够工作。RABBITMQ_MNESIA_BASE和RABBITMQ_LOG_BASE需要创建权限(服务器在启动的时候会创建),RABBITMQ_ENABLED_PLUGINS_FILE 需要写权限(针对rabbitmq-plugins)。这个配置文件可以在/etc/rabbitmq/里面进行查看。

访问端口

SELinux and similar 机器也许会阻止RabbitMQ绑定端口。这种情况下,RabbitMQ将会启动失败。确保以下端口是被允许打开的:

1
2
3
4
5
4369 (epmd), 25672 (Erlang distribution)
5672, 5671 (AMQP 0-9-1 without and with TLS)
15672 (if management plugin is enabled)
61613, 61614 (if STOMP is enabled)
1883, 8883 (if MQTT is enabled)

可以配置RabbitMQ使用其他端口。

默认用户访问

这个服务创建了一个用户名guest密码为guest的用户。没有配置的客户端将采用这个用户。默认情况下这个凭证只能用于连接本机。

查看文档中访问控制信息。了解如何创建更多的用户、删除guest用户,或者允许guest用户进行远程访问。

管理服务

怎么停止服务或者检查它的状态等,你可以调用sbin/rabbitmqctl(作为运行rabbitmq-server的用户来操作)。如果没有服务在运行rabbitmqctl命令将报告节点不在线的信息。

  • 调用 rabbitmqctl stop 命令停止服务.
  • 调用 rabbitmqctl status 检查服务运行状态.

更多的信息查看rabbitmqctl

控制系统的限制

RabbitMQ安装运行在生产环境的工作负载情况下需要对系统的内核参数限制进行调整,保证一个合适的数量来处理并发的连接和队列。这里主要设置打开文件的最大数量,可以使用这个命令 ulimit -n。许多操作系统作为一个消息代理来说这个默认值太低了(大部分的linux默认值都是1024)。我们推荐使用最少65536的文件描述符作为用户rabbitmq的生产环境,4096 基本上满足大多数开发工作负载。

这里有2个限制参数:最大允许打开的文件数(fs.file-max on Linux, kern.maxfilesperproc on OS X and FreeBSD)和per-user 限制数(ulimit -n)。前者必须高于后者。

更多关于控制系统限制的信息,请参考 Riak guide on open file limit tuning.

验证限制

UI管理页面 在概述这个选项卡里显示了可用的文件描述符数量。

1
rabbitmqctl status

上面的信息也包含了同样的值。
这个命令:

1
ulimit -a

能够显示当前用户的限制数。There may be more convenient OS-specific ways of doing that for a running process, such as the /proc filesystem on Linux.

##配置管理工具
配置管理工具(比如:Chef, Puppet, BOSH)提供了对系统限制的调整。我们的开发工具指南里面列出了相关的模块和项目。

远程过程调用 (RPC)

在第二篇教程中我们介绍了如何使用工作队列(work queue)在多个工作者(woker)中间分发耗时的任务。

可是如果我们需要将一个函数运行在远程计算机上并且等待从那儿获取结果时,该怎么办呢?这就是另外的故事了。这种模式通常被称为远程过程调用(Remote Procedure Call)或者RPC。

这篇教程中,我们会使用RabbitMQ来构建一个RPC系统:包含一个客户端和一个RPC服务器。现在的情况是,我们没有一个值得被分发的足够耗时的任务,所以接下来,我们会创建一个模拟RPC服务来返回斐波那契数列。

客户端接口

为了展示RPC服务如何使用,我们创建了一个简单的客户端类。它会暴露出一个名为“call”的方法用来发送一个RPC请求,并且在收到回应前保持阻塞。

1
2
3
FibonacciRpcClient fibonacciRpc = new FibonacciRpcClient();   
String result = fibonacciRpc.call("4");
System.out.println( "fib(4) is " + result);

关于RPC的注意事项:

尽管RPC在计算领域是一个常用模式,但它也经常被诟病。当一个问题被抛出的时候,程序员往往意识不到这到底是由本地调用还是由较慢的RPC调用引起的。同样的困惑还来自于系统的不可预测性和给调试工作带来的不必要的复杂性。跟软件精简不同的是,滥用RPC会导致不可维护的代码.

考虑到这一点,牢记以下建议:

  • 确保能够明确的搞清楚哪个函数是本地调用的,哪个函数是远程调用的。
  • 给你的系统编写文档。保持各个组件间的依赖明确。处理错误案例。
  • 明了客户端改如何处理RPC服务器的宕机和长时间无响应情况。

当对避免使用RPC有疑问的时候。如果可以的话,你应该尽量使用异步管道来代替RPC类的阻塞。结果被异步地推送到下一个计算场景。

回调队列

一般来说通过RabbitMQ来实现RPC是很容易的。一个客户端发送请求信息,服务器端将其应用到一个回复信息中。为了接收到回复信息,客户端需要在发送请求的时候同时发送一个回调队列(callback queue)的地址。我们使用默认的队列。来尝试一下:

1
2
3
4
5
6
7
8
9
10
callbackQueueName = channel.queueDeclare().getQueue();

BasicProperties props = new BasicProperties
.Builder()
.replyTo(callbackQueueName)
.build();

channel.basicPublish("", "rpc_queue", props, message.getBytes());

// ... then code to read a response message from the callback_queue ...

消息属性

AMQP协议给消息预定义了一系列的14个属性。大多数属性很少会用到,除了以下几个:

  • delivery_mode(投递模式):将消息标记为持久的(值为2)或暂存的(除了2之外的其他任何值)。第二篇教程里接触过这个属性,记得吧?
  • content_type(内容类型):用来描述编码的mime-type。例如在实际使用中常常使用application/json来描述JOSN编码类型。
  • reply_to(回复目标):通常用来命名回调队列。
  • correlation_id(关联标识):用来将RPC的响应和请求关联起来。

我们需要导入这个类:

1
import com.rabbitmq.client.AMQP.BasicProperties;

关联标识

上边介绍的方法中,我们建议给每一个RPC请求新建一个回调队列。这不是一个高效的做法,幸好这儿有一个更好的办法 —— 我们可以为每个客户端只建立一个独立的回调队列。

这就带来一个新问题,当此队列接收到一个响应的时候它无法辨别出这个响应是属于哪个请求的。correlation_id 就是为了解决这个问题而来的。我们给每个请求设置一个独一无二的值。稍后,当我们从回调队列中接收到一个消息的时候,我们就可以查看这条属性从而将响应和请求匹配起来。如果我们接手到的消息的correlation_id是未知的,那就直接销毁掉它,因为它不属于我们的任何一条请求。

你也许会问,为什么我们接收到未知消息的时候不抛出一个错误,而是要将它忽略掉?这是为了解决服务器端有可能发生的竞争情况。尽管可能性不大,但RPC服务器还是有可能在已将应答发送给我们但还未将确认消息发送给请求的情况下死掉。如果这种情况发生,RPC在重启后会重新处理请求。这就是为什么我们必须在客户端优雅的处理重复响应,同时RPC也需要尽可能保持幂等性。

总结

我们的RPC如此工作:

  • 当客户端启动的时候,它创建一个匿名独享的回调队列。
  • 在RPC请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 reply_to 属性,另一个是设置唯一值的 correlation_id 属性。
  • 将请求发送到一个 rpc_queue 队列中。
  • RPC工作者(又名:服务器)等待请求发送到这个队列中来。当请求出现的时候,它执行他的工作并且将带有执行结果的消息发送给reply_to字段指定的队列。
  • 客户端等待回调队列里的数据。当有消息出现的时候,它会检查correlation_id属性。如果此属性的值与请求匹配,将它返回给应用。

完整的代码

这是斐波那契方法:

1
2
3
4
5
private static int fib(int n) throws Exception {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}

我们声明了fibonacci函数,我们假设输入了一个有效的整数值。(别指望这个函数能处理很大的数值,会使得这个递归非常的慢)。

我们的RPC服务程序RPCServer.java如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
private static final String RPC_QUEUE_NAME = "rpc_queue";

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);

System.out.println(" [x] Awaiting RPC requests");

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();

BasicProperties props = delivery.getProperties();
BasicProperties replyProps = new BasicProperties
.Builder()
.correlationId(props.getCorrelationId())
.build();

String message = new String(delivery.getBody());
int n = Integer.parseInt(message);

System.out.println(" [.] fib(" + message + ")");
String response = "" + fib(n);

channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

服务器端代码相当简单:

  • 像往常一样,我们建立连接,声明队列。
  • 或许我们希望能在服务器上多开几个线程。为了能将负载平均地分摊到多个服务器,我们需要将 prefetch_count 设置好。
  • 我们为 basic_consume 声明了一个回调函数,这是RPC服务器端的核心。它执行实际的操作并且作出响应。

RPC客户端程序 RPCClient.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;

public RPCClient() throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();

replyQueueName = channel.queueDeclare().getQueue();
consumer = new QueueingConsumer(channel);
channel.basicConsume(replyQueueName, true, consumer);
}

public String call(String message) throws Exception {
String response = null;
String corrId = java.util.UUID.randomUUID().toString();

BasicProperties props = new BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();

channel.basicPublish("", requestQueueName, props, message.getBytes());

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
response = new String(delivery.getBody());
break;
}
}

return response;
}

public void close() throws Exception {
connection.close();
}

客户端代码稍微有点难懂:

  • 我们建立了一个连接和通道,声明了一个回调队列作为回复。
  • 我们订阅这个回调队列,以便接收RPC的响应。
  • 我们定义我们的回调方法call。它执行真正的RPC请求。
  • 在这个方法中,首先我们生成一个唯一的 correlation_id 值并且保存起来,回调函数会用它来获取符合要求的响应。
  • 接下来,我们将带有 replyTo 和 correlationId 属性的消息发布出去。
  • 现在我们可以坐下来,等待正确的响应到来。
  • 最后,我们将响应返回给用户。

客户端的请求:

1
2
3
4
5
6
7
RPCClient fibonacciRpc = new RPCClient();

System.out.println(" [x] Requesting fib(30)");
String response = fibonacciRpc.call("30");
System.out.println(" [.] Got '" + response + "'");

fibonacciRpc.close();

现在我们来看看完整的源码RPCClient.java and RPCServer.java.

和之前一样编译程序:

1
$ javac -cp rabbitmq-client.jar RPCClient.java RPCServer.java

我们的RPC服务已经准备就绪了,现在启动服务器端:

1
2
$ java -cp $CP RPCServer
[x] Awaiting RPC requests

运行客户端,请求一个fibonacci队列:

1
2
$ java -cp $CP RPCClient
[x] Requesting fib(30)

此处呈现的设计并不是实现RPC服务的唯一方式,但是他有一些重要的优势:

  • 如果RPC服务器运行的过慢的时候,你可以通过运行另外一个服务器端轻松扩展它。试试在控制台中运行第二个 RPCServer 。
  • 在客户端,RPC请求只发送或接收一条消息。不需要像 queue_declare 这样的异步调用。所以RPC客户端的单个请求只需要一个网络往返。

我们的代码依旧非常简单,而且没有试图去解决一些复杂(但是重要)的问题,如:

  • 当没有服务器运行时,客户端如何作出反映。
  • 客户端是否需要实现类似RPC超时的东西。
  • 如果服务器发生故障,并且抛出异常,应该被转发到客户端吗?
  • 在处理前,防止混入无效的信息(例如检查边界)

如果你想做一些实验,你会发现rabbitmq-management plugin在观测队列方面是很有用处的。

Topics(主题)

上一篇教程里,我们改进了我们的日志系统。我们使用直连交换机替代了扇型交换机,从只能盲目的广播消息改进为可以选择性的接收日志。

尽管直连交换机能够改善我们的系统,但是它也有它的限制 —— 没办法基于多个标准执行路由操作。

在我们的日志系统中,我们不只希望订阅基于严重程度的日志,同时还希望订阅基于发送来源的日志。Unix工具syslog就是同时基于严重程度-severity (info/warn/crit…) 和 设备-facility (auth/cron/kern…)来路由日志的。

如果这样的话,将会给予我们非常大的灵活性,我们既可以监听来源于“cron”的严重程度为“critical errors”的日志,也可以监听来源于“kern”的所有日志。

为了实现这个目的,接下来我们学习如何使用另一种更复杂的交换机 —— 主题交换机。

Topic exchange

发送到主题交换机(topic exchange)的消息不可以携带随意什么样子的路由键(routing_key),它的路由键必须是一个由.分隔开的词语列表。这些单词随便是什么都可以,但是最好是跟携带它们的消息有关系的词汇。以下是几个推荐的例子:”stock.usd.nyse”, “nyse.vmw”, “quick.orange.rabbit”。词语的个数可以随意,但是不要超过255字节。

绑定键也必须拥有同样的格式。主题交换机背后的逻辑跟直连交换机很相似 —— 一个携带着特定路由键的消息会被主题交换机投递给绑定键与之想匹配的队列。但是它的绑定键和路由键有两个特殊应用方式:

  • *(星号) 用来表示一个单词.
  • #(井号) 用来表示任意数量(零个或多个)单词。

用图来表示:

这个例子里,我们发送的所有消息都是用来描述小动物的。发送的消息所携带的路由键是由三个单词所组成的,这三个单词被两个.分割开。路由键里的第一个单词描述的是动物的敏捷,第二个单词是动物的颜色,第三个是动物的种类。所以它看起来是这样的: ..

我们创建了三个绑定:Q1的绑定键为 *.orange.*,Q2的绑定键为 ..rabbit 和 lazy.# 。

这三个绑定键被可以总结为:

  • Q1 对所有的桔黄色动物都感兴趣。
  • Q2 则是对所有的兔子和所有懒惰的动物感兴趣

一个携带有 quick.orange.rabbit 的消息将会被分别投递给这两个队列。携带着 lazy.orange.elephant 的消息同样也会给两个队列都投递过去。另一方面携带有 quick.orange.fox 的消息会投递给第一个队列,携带有 lazy.brown.fox 的消息会投递给第二个队列。携带有 lazy.pink.rabbit 的消息只会被投递给第二个队列一次,即使它同时匹配第二个队列的两个绑定。携带着 quick.brown.fox 的消息不会投递给任何一个队列。

如果我们违反约定,发送了一个携带有一个单词或者四个单词(”orange” or “quick.orange.male.rabbit”)的消息时,发送的消息不会投递给任何一个队列,而且会丢失掉。

但是另一方面,即使 “lazy.orange.male.rabbit” 有四个单词,他还是会匹配最后一个绑定,并且被投递到第二个队列中。

主题交换机

主题交换机是很强大的,它可以表现出跟其他交换机类似的行为

当一个队列的绑定键为 “#”(井号) 的时候,这个队列将会无视消息的路由键,接收所有的消息。

当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时主题交换机就拥有的直连交换机的行为。

Putting it all together

接下来我们会将主题交换机应用到我们的日志系统中。在开始工作前,我们假设日志的路由键由两个单词组成,路由键看起来是这样的:.

代码跟上一篇教程差不多。

EmitLogTopic.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class EmitLogTopic {

private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] argv)
throws Exception {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic");

String routingKey = getRouting(argv);
String message = getMessage(argv);

channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

connection.close();
}
//...
}

ReceiveLogsTopic.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogsTopic {
private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();

if (argv.length < 1) {
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}

for (String bindingKey : argv) {
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}

Run the following examples, including the classpath as in Tutorial 1 - on Windows, use %CP%.
To receive all the logs:
运行这个例子,和之前一样使用了%CP%环境变量。
接收所有的日志信息写法:

1
$ java -cp $CP ReceiveLogsTopic "#"

接收来自”kern“设备的日志:

1
$ java -cp $CP ReceiveLogsTopic "kern.*"

只接收严重程度为”critical“的日志:

1
$ java -cp $CP ReceiveLogsTopic "*.critical"

建立多个绑定:

1
$ java -cp $CP ReceiveLogsTopic "kern.*" "*.critical"

发送路由键为 “kern.critical” 的日志:

1
$ java -cp $CP EmitLogTopic "kern.critical" "A critical kernel error"

执行上边命令试试看效果吧。另外,上边代码不会对路由键和绑定键做任何假设,所以你可以在命令中使用超过两个路由键参数。

思考以下问题:

  • 绑定键为 * 的队列会取到一个路由键为空的消息吗?
    不会
    1
    2
    ./ReceiveLogsTopic "*"
    ./EmitLogTopic ""
  • 绑定键为 #.* 的队列会获取到一个名为..的路由键的消息吗?
    不会
    1
    2
    ./ReceiveLogsTopic "#.*"
    ./EmitLogTopic ".."
  • 它会取到一个路由键为单个单词的消息吗?
    1
    2
    ./ReceiveLogsTopic "#.*"
    ./EmitLogTopic "a"
  • a.*.# 和 a.#的区别在哪儿?

‘a.*.#’匹配2个单词以上的情况,并且第一个单词是’a’.开头的。’a.#’匹配的是一个单词以上的,第一个单词是’a’.

1
2
3
4
./ReceiveLogsTopic "a.*.#"
./EmitLogTopic "a.b"
./ReceiveLogsTopic "a.#"
./EmitLogTopic "a.b"

(完整的代码 EmitLogTopic.java and ReceiveLogsTopic.java)

Routing(路由)

上一节,我们创建了一个日志系统,我们能够分发log信息给每个订阅者。

这一节,我们在其上添加额外的功能——只订阅log信息的一个子集。例如:我们只把至关重要的错误日志信息,记录到文件,而所有的日志信息都可以在控制带输出。

Bindings

上一节,我们已经定义了绑定信息,你可以回想一下代码:

1
channel.queueBind(queueName, EXCHANGE_NAME, "");

binding是exchange与queue之间的关系,简单的来说就是:queue对来自指定的exchange的消息感兴趣。

Binding可以指定routingKey参数。为了避免和BasicPublish参数疑惑,我们可以叫它 binding key。因此我们可以创建一个带key的bingding。

1
channel.queueBind(queueName, EXCHANGE_NAME, "black");

bingding key的意义取决于 exchange的类型。fanout类型的exchange会忽略这个值。

Direct exchange

上一节的日志系统只能把素有的消息广播给所有的消费者。我们想根据message的log lever来过滤message。例如,我们只想把错误的日志信息写到磁盘里,警告和信息日志不写到磁盘里。

但是fanout类型exchange 不够灵活,它只能盲目的进行广播。

因此这里我们使用 direct类型的exchange来替代。direct 类型exchange背后的算法很简单——一个消息只会发送给queue的bingding key 完全匹配message的routing key的队列。

大体结构如下所示:

我们看到 direct类型的exchange X 有两个queue绑定到它。第一个 bingding key是orange。第二个有两个bingding Key:black和green。

因此,如果一个message的routing key是orange会发送给Q1队列,如果是blcak或green则会发送给Q2,其他的消息则会被丢弃掉。

Multiple bindings

多个队列绑定同样的key是合法的。我们可以给Q1绑定一个black的key,在这种情况下,direct 类型的exchage的行为和fanout表现的一样。一个消息将会路由给Q1和Q2。

Emitting logs

我们将使用这个模型作为我们的日志系统。我们通过使用一个direct exchange类型来发送消息,代替之前的fanout类型。我们通过日志的级别来作为routing key。订阅者可以通过选择severity来得到他们想要接收的信息。我们首先关注emitting logs。

和之前一样,我们需要先创建一个交换机:

1
channel.exchangeDeclare(EXCHANGE_NAME, "direct");

接着我们发送一条消息:

1
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());

为了简化,我们的serverity 可以是”info”,”warning”,”error”。

Subscribing(订阅)

接收消息的工作和之前的教程一样,只有一处不一样-我们会循环severity来创建绑定信息。

1
2
3
4
5
String queueName = channel.queueDeclare().getQueue();

for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

Putting it all together

EmitLogDirect.java 类的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class EmitLogDirect {

private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] argv)
throws java.io.IOException {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

String severity = getSeverity(argv);
String message = getMessage(argv);

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

channel.close();
connection.close();
}
//..
}

ReceiveLogsDirect.java类的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogsDirect {

private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();

if (argv.length < 1){
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}

for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}

编译还是和以前一样,为了方便我们使用环境变量$CP(window使用%CP%)作为例子的运行路径:

如果你只想保存 ‘warning’和’error’(不需要’info’)日志信息到文件中,打开控制台并制定类型:

1
$ java -cp $CP ReceiveLogsDirect warning error > logs_from_rabbit.log

如果你想在你的屏幕上查看所有的日志信息,打开一个新的终端输入以下信息:

1
2
$ java -cp $CP ReceiveLogsDirect info warning error
[*] Waiting for logs. To exit press CTRL+C

我们发送一个错误日志信息试试:

1
2
$ java -cp $CP EmitLogDirect error "Run. Run. Or it will explode."
[x] Sent 'error':'Run. Run. Or it will explode.'

(完整的代码 (EmitLogDirect.java source) and (ReceiveLogsDirect.java source))

发布/订阅

在上一节我们创建了一个工作队列。工作队列会按顺序分配给每一个工作者。在这一节里我们会把一条消息给多个消费者。这种模式叫做”发布/订阅”。

为了阐明这种模式,我们建立一个日志系统。它包含两个程序 – 一个发出日志消息,另一个接收并打印日志消息。

在这个日志系统中,每一个receiver程序都会获得日志信息。因此我们可以设置一个接收者直接保存日志信息到硬盘,另一个接收者打印日志信息到屏幕。

就是说,发布的日志消息会广播给所有的接收者。

Exchanges(交换机)

上一节我们从一个队列里发送和接收消息。现在我们完整的讲解Rabbit的消息发送模型。
我们快速的回顾一下之前的接收流程:

  • 一个生产者通过程序发送消息
  • 一个队列作为缓冲区存储消息
  • 一个消费者通过程序接收消息

RabbitMQ的消息模型核心思想是生产者不会直接向队列发送任何消息。实际上,生产者甚至不知道一个消息将传递给哪个队列。

事实上,生产者只能发送消息给一个exchange。exchange 很简单。一方面它接收来自生产者的消息,另一方面它把消息推送给队列。因此exchange要知道怎么处理接收到的message。是把message发给一个特定的队列?还是发给多个队列?或者丢弃?这个规则是由 exchange type 定义的。

exchange 有以下几种:direct, topic, headers 和 fanout。我们主要使用最后一种——fanout。下面我们定义一个 名字叫logs,类型为fanout的exchange:

1
channel.exchangeDeclare("logs", "fanout");

fanout exchange很简单。它会广播所有收到的message传递给它知道的queue。这正是我们需要的日志记录方式。

Listing exchanges
你可以在服务器上面使用rabbitmqctl命令查看所有的交换机信息:

1
2
3
4
5
6
7
8
9
10
11
12
$ sudo rabbitmqctl list_exchanges
Listing exchanges ...
direct
amq.direct direct
amq.fanout fanout
amq.headers headers
amq.match headers
amq.rabbitmq.log topic
amq.rabbitmq.trace topic
amq.topic topic
logs fanout
...done.

In this list there are some amq.* exchanges and the default (unnamed) exchange. These are created by default, but it is unlikely you’ll need to use them at the moment.
这个列表信息里有一些amq.*开头的是默认的交换机。他们默认就被创建好了。

Nameless exchange
在之前章节的教程中,我们不知道exchange,但是我们仍然能够把message传递给queue,这是因为我们使用了默认的exchange。我们使用了空字符串作为交换机信息。

之前我们发布message,用的代码如下所示:

1
channel.basicPublish("", "hello", null, message.getBytes());

第一参数代表exchange的名字,这里使用空字符串表示用默认的或nameless exchange。如果队列存在的话,message会被发送给routingKey指定的队列。

接下来,我们把它替换成发布到我们命名的exchange:

1
channel.basicPublish( "logs", "", null, message.getBytes());

Temporary queues(临时队列)

之前我们使用的queue都有一个特定的名字(hello 或者 task_queue?)。当你想在生产者和消费者之间共享队列,给queue命名是至关重要的。

但是 这种情况对于我们的日志系统是不适合的。我们想看到所有的日志信息,而不是仅仅是他的一个子集。我们也只对当前流动的感兴趣而不是旧的消息。为了解决这个问题我们需要做两件事:

首先,我们需要一个新的,空的队列,不管什么时候我们连接到Rabbit。这就需要,我们每次连接rabbti都要创建一个名字随机的队列,或者让服务器选择一个名字随机的队列给我们。

其次,一旦我们consumer断开与queue的连接,queue应该自动删除。

在Java client 我们提供了一个无参的 queueDeclare()方法,使用它,我们可以创建一个 不持久化,名字唯一,自动删除的队列。

1
String queueName = channel.queueDeclare().getQueue();

这里queueName是随机生成的队列的名字。例如amq.gen-JzTY20BRgKO-HjmUJj0wLg.

Bindings(绑定)

我们已经创建了一个 fanout类型的exchange和所需的queue。现在,我们就需要告诉 exchange 发送messages 到我们指定的 queue。这里,exchange和queue的关系我们叫做binding(绑定)。

1
channel.queueBind(queueName, "logs", "");

现在这个logs exchange将把消息添加到我们指定的队列中。

Listing bindings

你可以通过rabbitmqctl list_bindings命令,查看已经存在的bingding。

Putting it all together

这个生产程序里,发送日志消息,和之前的教程有所区别。主要的变化是我们通过指定的logs exchange去发布消息。以前我们需要通过一个routingKey去发送,这里我们通过fanout exchanges忽略了这个值。这里是EmitLog.java的程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

public class EmitLog {

private static final String EXCHANGE_NAME = "logs";

public static void main(String[] argv)
throws java.io.IOException {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

String message = getMessage(argv);

channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

channel.close();
connection.close();
}
//...
}

(EmitLog.java source)

正如你看到的,在建立连接后我们声明了一个交换机。这一步是必须的,禁止发布消息到一个不存在的交换机中。

如果交换机没有和队列进行绑定,消息将会丢失,但是这种情况是允许的;如果没有消费者监听,我们可以安全的丢弃消息。

ReceiveLogs.java的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import com.rabbitmq.client.*;

import java.io.IOException;

public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}

(ReceiveLogs.java source)

编译我们写的代码。

1
$ javac -cp .:rabbitmq-client.jar EmitLog.java ReceiveLogs.java

如果我们想保存日志信息到文件,只需要打开控制台指定ReceiveLogs类型:

1
$ java -cp .:rabbitmq-client.jar ReceiveLogs > logs_from_rabbit.log

如果你想在屏幕上查看日志信息,新打开一个终端运行:

1
$ java -cp .:rabbitmq-client.jar ReceiveLogs

最后,发送日志:

1
$ java -cp .:rabbitmq-client.jar EmitLog

使用rabbitmqctl list_bindings命令可以验证创建的bindings and queues信息,有2个ReceiveLogs.java programs 运行中,你会看到这样的信息:

1
2
3
4
5
$ sudo rabbitmqctl list_bindings
Listing bindings ...
logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
...done.

结果很简单,数据从logs交换机传递给了两个队列,这正是我们想要的结果。

工作队列

(使用java客户端)

在第一节的教程里,我们创建了一个程序,发送和接收消息,从一个named queue(命名队列 )。本节,我们会创建一个 Work Queue(工作队列),用来分发耗时任务给多个Workers(工人)。

使用Work Queues(别名:Task Queue)是为了避免立即做一个资源密集型任务,而不得不等待它完成。我们可以把这个耗时的任务封装提取起来作为message,发送给一个queue。一个Worker 后台进程会获取task,然后执行他。当有多个Workers 时,他们平分这些task。

准备

上一节的教程,我们发送已一条包含“Hello World”的消息。这一节我们要发送一个复杂的任务。我们不做真实的任务,采用Thread.Sleep()来模拟我们的程序很忙。我们以程序中发送的字符串的点数来作为复杂性。每一个点占用1秒的时间,比如,一个任务的内容是Hello…将会耗时三秒钟。

我们修改上一节的Send.java的代码,允许命令行发送任意消息,这个程序会将任务分配给我们的工作队列,我们叫它NewTask.java:

1
2
3
4
String message = getMessage(argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

从命令行获取消息信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static String getMessage(String[] strings){
if (strings.length < 1)
return "Hello World!";
return joinStrings(strings, " ");
}

private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}

我们的Recv.java程序也必须做改变:它需要处理每个消息的内容。我们叫它Worker.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
}
}
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

我们的工作任务模拟执行时间:

1
2
3
4
5
private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}

在工作目录中编译这2个文件:

1
$ javac -cp rabbitmq-client.jar NewTask.java Worker.java

循环调度

使用Task Queue的一个优点就是可以很容易的平均分配任务。如果queue里有堆积过多的任务,我们可以添加更多的Worker就行了。规模很容易扩大。

接下来,我们同时运行2个工作实例,他们将从队列中获取消息,但怎么去获取?我们一起看看。

你需要打开三个控制台。2个运行工作程序,这些控制台作为我们的2个消费者 - C1和C2.

1
2
3
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
1
2
3
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C

在第三个控制台发布新的任务。你可以发布少量的消息给消费者消费:

1
2
3
4
5
6
7
8
9
10
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask First message.
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Second message..
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Third message...
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fourth message....
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fifth message.....

我们看看什么内容给了我们的工作者:

1
2
3
4
5
6
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
[x] Received 'Third message...'
[x] Received 'Fifth message.....'
1
2
3
4
5
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Second message..'
[x] Received 'Fourth message....'

默认RabbitMQ会顺序的,平均的把任务发给每个consumer,到最后每个Consumer会得到相同数量的任务。这种分配方式我们称为round-robin。可以尝试第三个或者更多的工作者。

Message acknowledgment

执行一个耗时的任务,你可能会想知道任务的执行情况。是否有Consumer开始执行任务了?是否任务执行到一半死机了?
当前我们上面的代码,一旦RabbitMQ分发message给Custoerm,它就会立刻从内存删除。这种情况下,如果你关闭一个Worker,我们就会丢失他正在执行的消息。同样,我们也会丢失之前分发给他,还没有来的及执行的消息。

但是我们不想丢失任何task。如果一个Worker死了,我们想把任务分发给其他的Worker。

为了确保message不丢失,RabbitMQ 提供了 message acknowledgments。Ack是consumer 发送给RabbitMQ的,告诉它,task 已经接受,并处理了,RabbitMQ 可以删除它了。

如果一个consumer死机了(channel closed,connection closed or Tcp connection lost),没有返回ack,RabbitMQ就会知道task 没有处理完,该task就会重新排队。如果这时候有另外一个Consumer在线,RabbitMQ 就会把它分发给他。

所有的消息都不会超时,当消费者挂掉后RabbitMQ将会重新发送消息,处理一条消息有可能会很长时间,但是总是能处理掉。

默认Message acknowLedgments 是打开的,之前的例子里我们显示的设置了autoAck=true,现在我们设置成false,一旦我们处理了一条任务,我们就发送一个正确的确认信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
channel.basicQos(1); // accept only one unack-ed message at a time (see below)

final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

运行上面的代码,如果我们使用CTRL+C kill一个worker,消息也不会被丢失,很快这个消息就会被再次投递给其他工作者。

Forgotten acknowledgment

丢失BasicAck是很常见的错误,尽管这个错很小,但后果很严重。当Client quit,Messages 会重新分发,但是RabbitMQ 由于不能释放掉那些unacked message ,所以会消耗越来越多的内存。

为了 调试这种错误, 你可以使用rabbitmqctl来打印出 messages_unacknowledged 的message信息:

1
2
3
4
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello 0 0
...done.

消息持久化

通过上面的ACK配置,当consumer 死亡的时候,task 不会丢失。但是如果RabbitMQ服务停了,task 仍然会丢失。
这里我们就要持久化 task的信息了。

当RabbitMQ停止或者宕机了,如果你没有告诉它怎么处理的话,队列和消息也会丢失。需要做两步操作来确保消息不被丢失:我们需要标记队列和消息是持久化的。

首先,我们需要保证队列在RabbitMQ中不会丢失。我们在代码里声明持久化:

1
2
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

尽管我们定义名字叫hello 队列要持久化,但是仍然不会生效。这是因为我们已经定义了一个没有持久化的名字叫hello 队列。RabbitMQ 不允许重新定义(用不同的参数)一个已经存在的队列,会报错。因此这里我们应该另外定义一个队列,
例如 task_queue:

1
2
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

queue 持久化的修改,producer 和consumer的代码都要修改.

通过上面的代码设置我们的queue,即使RabbitMQ重启也不会丢失。接下来,我们来持久化message。发布消息的时候提供MessageProperties.PERSISTENT_TEXT_PLAIN值即可持久化。

消息持久化备注

尽管我们设置message持久化了,但是这也不能完全保证message不会丢失。
这是由于RabbitMQ保存message到硬盘是需要时间的,如果再此期间RabbitMQ服务挂了,message就丢失了。不过对于一般的程序已经足够了。如果要一个更强壮的方案,你可是使用publisher confirms.

公平调度

也许你已经主要到,上面代码实现的message的调度不是你想要的。例如,假设有两个Worker,所有的奇数的message都是耗时的操作,而偶数的message都是很简单的。你会发现一个Worker很空闲,而另一个Woker累死累活的。然而RabbitMQ不知道,还是不停的给他发任务。

这个情况的发生,是由于RabbitMQ 不看 the number of unacknowledged message,只要message进入队列就分发message。他只是盲目的分发message。

为了解决上面的问题,我们可以使用 basicQos方法 设置 prefetchCount=1。这个设置会告诉RabbitMQ 每次给Workder只分配一个task,只有当task执行完了,才分发下一个任务。

1
2
int prefetchCount = 1;
channel.basicQos(prefetchCount);

NOTE:注意queue的size

如果所有的Worker都很忙,你的队列会填满,因此你需要监测queue的情况,添加更多的worker 或者采用其他的策略。

Putting it all together

最终的代码在我们的NewTask.java类中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

private static final String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] argv)
throws java.io.IOException {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

String message = getMessage(argv);

channel.basicPublish( "", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

channel.close();
connection.close();
}
//...
}

(NewTask.java source)
Worker.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

channel.basicQos(1);

final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
}

private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}

(Worker.java source)

使用message acknowledgments和prefetchCount可以设置一个工作队列。通过durability选项让我们的任务能够在RabbitMQ重启后也能够存在。

关于Channel方法和MessageProperties的更多信息,你可以浏览在线的javadocs.

介绍

RabbitMQ是一个消息代理。它的核心原理非常简单:接收和发送消息。你可以把它想像成一个邮局:你把信件放入邮箱,邮递员就会把信件投递到你的收件人处。在这个比喻中,RabbitMQ是一个邮箱、邮局、邮递员。

RabbitMQ和邮局的主要区别是,它处理的不是纸,而是接收、存储和发送二进制的数据——消息。

一般提到RabbitMQ和消息,都用到一些专有名词。

  • 生产(Producing)意思就是发送。发送消息的程序就是一个生产者(producer)。我们一般用”P”来表示:
    生产
  • 队列(queue)就是邮箱的名称。消息通过你的应用程序和RabbitMQ进行传输,它们能够只存储在一个队列(queue)中。队列(queue)没有任何限制,你要存储多少消息都可以——基本上是一个无限的缓冲。多个生产者(producers)能够把消息发送给同一个队列,同样,多个消费者(consumers)也能够从一个队列(queue)中获取数据。队列可以用下图标识:
    此处输入图片的描述
  • 消费(Consuming)和获取消息是一样的意思。一个消费者(consumer)就是一个等待获取消息的程序。我们把它画作”C”:
    消费

注意:一般生产者,消费者和代理不必部署在同一台机子上。

“Hello World”

(using the Java Client)
我们将在这个章节里创建2个java程序;一个生产者发送一条消息,一个消费者接受消息并且打印输出。我们跳过Java API的细节,从最简单的事情开始说起。通过一条 “Hello World”作为消息。

在下面的图表中, “P” 是我们的生产者,”C”是我们的消费者。中间是一个队列 - 作为RabbitMQ的消息缓冲区提供给消费者。

The Java client library

RabbitMQ 支持多种协议。该指南使用的是AMQP 0-9-1协议,这是一个开放的,通用的消息协议。RabbitMQ 提供了许多客户端语言的支持。我们将使用Java client 提供商。
下载客户端依赖包,检查签名信息。解压到你的工作目录:

1
2
$ unzip rabbitmq-java-client-bin-*.zip
$ cp rabbitmq-java-client-bin-*/*.jar ./

(这个客户端你也可以从Maven的中心仓库去下载,groupId= com.rabbitmq,artifactId=amqp-client)
现在我们有了客户端的依赖,就可以编写一些代码了。

Sending 发送端


我们将执行我们的消息发送端发送给我们的消息接收端。发送者将连接到RabbitMQ,发送一条消息,然后退出。
在Send.java文件中,我们需要引入一些依赖:

1
2
3
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

在类里面设置队列的名称:

1
2
3
4
5
6
7
8
public class Send {
private final static String QUEUE_NAME = "hello";

public static void main(String[] argv)
throws java.io.IOException {
...
}
}

我们创建一个连接到服务器:

1
2
3
4
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

这里创建了一个Socket连接,负责协议版本协商和身份验证等等,这里由于连接的是本地机子,所以取值localhost。如果我们想连接到其他机器的代理服务,我们只需要添加IP地址就可以了。

接下来,创建一个channel(通道),大多数任务都是在这里完成的。

要发送消息,我们必须首先定义一个queue(队列),然后我们才成把消息发送给queue:

1
2
3
4
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

queue 的定义具有幂等性,如果不存在就会被创建。消息的内容是一个字节数组,所以你可以对内容进行编码。

最后,我们关闭通道和连接。

1
2
channel.close();
connection.close();

整个Send.java类在这里查看。
Sending doesn’t work!
发送端不工作的情况
如果你第一次使用RabbitMQ并且没有看到发送的消息,你可以不知道是什么原因导致的错误,也许是没有足够的磁盘空间(默认需要1Gb的空间).配置文档会告诉你怎么设置disk_free_limit。

Receiving 接收者

我们的接收者会从RabbitMQ拉取消息,不像发送者发送消息那样,接收端会保持监听消息,然后打印输出。

在Recv.java里也需要引入一些依赖:

1
2
3
4
5
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;

DefaultConsumer类实现了Consumer接口,从服务端拉取消息。

设置上和发送端一样;打开一个连接和一个通道,并且声明一个队列进行消费。这里要和发送端的队列保持一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Recv {
private final static String QUEUE_NAME = "hello";

public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
...
}
}

也许你已经发现,我们在接收端也定义了hello队列。这是为了确保,如果接收端先启动的时候,队列已经存在。

接下来,我们就要告诉服务器来交付消息。由于推送消息是一个异步操作,因此我们使用回调函数,DefaultConsumer.handleDelivery来处理。

1
2
3
4
5
6
7
8
9
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);

整个Recv.java类在这里查看。

Putting it all together

通过javac对2个文件进行编译操作:

1
$ javac -cp rabbitmq-client.jar Send.java Recv.java

然后运行他们,你需要将rabbitmq-client.jar放到classpath下,在一个终端里运行发送端:

1
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send

然后运行接收端:

1
$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv

接收端从RabbitMQ获取消息并打印,接收端会一直保持运行,等待消息到来(使用Ctrl-C 停止运行),你可以尝试运行发送端。

如果你想检查这个队列,使用 rabbitmqctl list_queues.
hello
移动到第二章节怎么构建一个简单的工作队列。

服务下载

| Description | Download |
| ——| —— | :—-: |
| Installer for Windows systems | rabbitmq-server-3.6.5.exe |

卸载以前的版本

如果你已经安装过Erlang VM,确保是64位的版本。

服务安装

首先,下载并运行Erlang Windows二进制文件。大约需要安装5分钟时间。
然后运行安装程序rabbitmq-server-3.6.5.exe。大约需要安装2分钟,RabbitMQ将被作为一个服务运行,提供默认配置。

运行RabbitMQ服务

定制 RabbitMQ环境变量
  在默认的设置环境里RabbitMQ可以运行的很好,你也可以定制RabbitMQ环境或者编辑配置
运行 RabbitMQ
  RabbitMQ服务是自动启动.你也可以通过开始菜单提供的stop/reinstall/start去管理服务。
管理服务
  你可以通过开始菜单连接到RabbitMQ的安装目录。在开始菜单里提供了一个cmd窗口能够快速连接到sbin目录。这里能够很方便的运行各种命令

访问端口

防火墙和一些安装工具会阻止RabbitMQ绑定的端口,这种情况下,RabbitMQ会启动失败,确保以下端口是被允许绑定的:

1
2
3
4
5
4369 (epmd), 25672 (Erlang distribution)
5672, 5671 (AMQP 0-9-1 without and with TLS)
15672 (if management plugin is enabled)
61613, 61614 (if STOMP is enabled)
1883, 8883 (if MQTT is enabled)

可以配置RabbitMQ使用其他端口。

默认用户访问

这个服务创建了一个用户名guest密码为guest的用户。没有配置的客户端将采用这个用户。默认情况下这个凭证只能用于连接本机。
查看文档中访问控制信息。了解如何创建更多的用户、删除guest用户,或者允许guest用户进行远程访问。

管理服务

通过sbin目录的rabbitmqctl.bat可以停止服务或者检查服务的状态(作为管理员操作)。
停止服务
  使用 rabbitmqctl stop.
检查服务的状态
  使用 rabbitmqctl status. 得到当前服务的运行信息报告.
更多的命令信息查看 rabbitmqctl

日志

服务器会输出一个日志文件RABBITMQ_NODENAME.log到RABBITMQ_LOG_BASE目录中,额外的日志数据会写到RABBITMQ_NODENAME-sasl.log文件中。
这个服务会追加到日志文件中,保留的是一个完整的日志记录。
你也可以通过命令rabbitmqctl rotate_logs来轮转日志。

Troubleshooting When Running as a Service

In the event that the Erlang VM crashes whilst RabbitMQ is running as a service, rather than writing the crash dump to the current directory (which doesn’t make sense for a service) it is written to an erl_crash.dump file in the base directory of the RabbitMQ server (set by the RABBITMQ_BASE environment variable, defaulting to %APPDATA%%RABBITMQ_SERVICENAME% - typically %APPDATA%\RabbitMQ otherwise).

Windows-specific Issues

We aim to make RabbitMQ a first-class citizen on Windows. However, sometimes there are circumstances beyond our control. Please consult the Windows-specific Issues page.

Getting Help

If you have questions or need help, feel free to ask on RabbitMQ mailing list.

0%