RabbitMQ Tutorials (Work queues)

工作队列

(使用java客户端)

在第一节的教程里,我们创建了一个程序,发送和接收消息,从一个named queue(命名队列 )。本节,我们会创建一个 Work Queue(工作队列),用来分发耗时任务给多个Workers(工人)。

使用Work Queues(别名:Task Queue)是为了避免立即做一个资源密集型任务,而不得不等待它完成。我们可以把这个耗时的任务封装提取起来作为message,发送给一个queue。一个Worker 后台进程会获取task,然后执行他。当有多个Workers 时,他们平分这些task。

准备

上一节的教程,我们发送已一条包含“Hello World”的消息。这一节我们要发送一个复杂的任务。我们不做真实的任务,采用Thread.Sleep()来模拟我们的程序很忙。我们以程序中发送的字符串的点数来作为复杂性。每一个点占用1秒的时间,比如,一个任务的内容是Hello…将会耗时三秒钟。

我们修改上一节的Send.java的代码,允许命令行发送任意消息,这个程序会将任务分配给我们的工作队列,我们叫它NewTask.java:

1
2
3
4
String message = getMessage(argv);

channel.basicPublish("", "hello", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

从命令行获取消息信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private static String getMessage(String[] strings){
if (strings.length < 1)
return "Hello World!";
return joinStrings(strings, " ");
}

private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}

我们的Recv.java程序也必须做改变:它需要处理每个消息的内容。我们叫它Worker.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
}
}
};
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

我们的工作任务模拟执行时间:

1
2
3
4
5
private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}

在工作目录中编译这2个文件:

1
$ javac -cp rabbitmq-client.jar NewTask.java Worker.java

循环调度

使用Task Queue的一个优点就是可以很容易的平均分配任务。如果queue里有堆积过多的任务,我们可以添加更多的Worker就行了。规模很容易扩大。

接下来,我们同时运行2个工作实例,他们将从队列中获取消息,但怎么去获取?我们一起看看。

你需要打开三个控制台。2个运行工作程序,这些控制台作为我们的2个消费者 - C1和C2.

1
2
3
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
1
2
3
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C

在第三个控制台发布新的任务。你可以发布少量的消息给消费者消费:

1
2
3
4
5
6
7
8
9
10
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask First message.
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Second message..
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Third message...
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fourth message....
shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
NewTask Fifth message.....

我们看看什么内容给了我们的工作者:

1
2
3
4
5
6
shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'First message.'
[x] Received 'Third message...'
[x] Received 'Fifth message.....'
1
2
3
4
5
shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar
Worker
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'Second message..'
[x] Received 'Fourth message....'

默认RabbitMQ会顺序的,平均的把任务发给每个consumer,到最后每个Consumer会得到相同数量的任务。这种分配方式我们称为round-robin。可以尝试第三个或者更多的工作者。

Message acknowledgment

执行一个耗时的任务,你可能会想知道任务的执行情况。是否有Consumer开始执行任务了?是否任务执行到一半死机了?
当前我们上面的代码,一旦RabbitMQ分发message给Custoerm,它就会立刻从内存删除。这种情况下,如果你关闭一个Worker,我们就会丢失他正在执行的消息。同样,我们也会丢失之前分发给他,还没有来的及执行的消息。

但是我们不想丢失任何task。如果一个Worker死了,我们想把任务分发给其他的Worker。

为了确保message不丢失,RabbitMQ 提供了 message acknowledgments。Ack是consumer 发送给RabbitMQ的,告诉它,task 已经接受,并处理了,RabbitMQ 可以删除它了。

如果一个consumer死机了(channel closed,connection closed or Tcp connection lost),没有返回ack,RabbitMQ就会知道task 没有处理完,该task就会重新排队。如果这时候有另外一个Consumer在线,RabbitMQ 就会把它分发给他。

所有的消息都不会超时,当消费者挂掉后RabbitMQ将会重新发送消息,处理一条消息有可能会很长时间,但是总是能处理掉。

默认Message acknowLedgments 是打开的,之前的例子里我们显示的设置了autoAck=true,现在我们设置成false,一旦我们处理了一条任务,我们就发送一个正确的确认信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
channel.basicQos(1); // accept only one unack-ed message at a time (see below)

final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);

运行上面的代码,如果我们使用CTRL+C kill一个worker,消息也不会被丢失,很快这个消息就会被再次投递给其他工作者。

Forgotten acknowledgment

丢失BasicAck是很常见的错误,尽管这个错很小,但后果很严重。当Client quit,Messages 会重新分发,但是RabbitMQ 由于不能释放掉那些unacked message ,所以会消耗越来越多的内存。

为了 调试这种错误, 你可以使用rabbitmqctl来打印出 messages_unacknowledged 的message信息:

1
2
3
4
$ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged
Listing queues ...
hello 0 0
...done.

消息持久化

通过上面的ACK配置,当consumer 死亡的时候,task 不会丢失。但是如果RabbitMQ服务停了,task 仍然会丢失。
这里我们就要持久化 task的信息了。

当RabbitMQ停止或者宕机了,如果你没有告诉它怎么处理的话,队列和消息也会丢失。需要做两步操作来确保消息不被丢失:我们需要标记队列和消息是持久化的。

首先,我们需要保证队列在RabbitMQ中不会丢失。我们在代码里声明持久化:

1
2
boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);

尽管我们定义名字叫hello 队列要持久化,但是仍然不会生效。这是因为我们已经定义了一个没有持久化的名字叫hello 队列。RabbitMQ 不允许重新定义(用不同的参数)一个已经存在的队列,会报错。因此这里我们应该另外定义一个队列,
例如 task_queue:

1
2
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);

queue 持久化的修改,producer 和consumer的代码都要修改.

通过上面的代码设置我们的queue,即使RabbitMQ重启也不会丢失。接下来,我们来持久化message。发布消息的时候提供MessageProperties.PERSISTENT_TEXT_PLAIN值即可持久化。

消息持久化备注

尽管我们设置message持久化了,但是这也不能完全保证message不会丢失。
这是由于RabbitMQ保存message到硬盘是需要时间的,如果再此期间RabbitMQ服务挂了,message就丢失了。不过对于一般的程序已经足够了。如果要一个更强壮的方案,你可是使用publisher confirms.

公平调度

也许你已经主要到,上面代码实现的message的调度不是你想要的。例如,假设有两个Worker,所有的奇数的message都是耗时的操作,而偶数的message都是很简单的。你会发现一个Worker很空闲,而另一个Woker累死累活的。然而RabbitMQ不知道,还是不停的给他发任务。

这个情况的发生,是由于RabbitMQ 不看 the number of unacknowledged message,只要message进入队列就分发message。他只是盲目的分发message。

为了解决上面的问题,我们可以使用 basicQos方法 设置 prefetchCount=1。这个设置会告诉RabbitMQ 每次给Workder只分配一个task,只有当task执行完了,才分发下一个任务。

1
2
int prefetchCount = 1;
channel.basicQos(prefetchCount);

NOTE:注意queue的size

如果所有的Worker都很忙,你的队列会填满,因此你需要监测queue的情况,添加更多的worker 或者采用其他的策略。

Putting it all together

最终的代码在我们的NewTask.java类中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;

public class NewTask {

private static final String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] argv)
throws java.io.IOException {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

String message = getMessage(argv);

channel.basicPublish( "", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

channel.close();
connection.close();
}
//...
}

(NewTask.java source)
Worker.java:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import com.rabbitmq.client.*;

import java.io.IOException;

public class Worker {
private static final String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

channel.basicQos(1);

final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");

System.out.println(" [x] Received '" + message + "'");
try {
doWork(message);
} finally {
System.out.println(" [x] Done");
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
};
boolean autoAck = false;
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
}

private static void doWork(String task) {
for (char ch : task.toCharArray()) {
if (ch == '.') {
try {
Thread.sleep(1000);
} catch (InterruptedException _ignored) {
Thread.currentThread().interrupt();
}
}
}
}
}

(Worker.java source)

使用message acknowledgments和prefetchCount可以设置一个工作队列。通过durability选项让我们的任务能够在RabbitMQ重启后也能够存在。

关于Channel方法和MessageProperties的更多信息,你可以浏览在线的javadocs.