工作队列 (使用java客户端) 在第一节的教程里,我们创建了一个程序,发送和接收消息,从一个named queue(命名队列 )。本节,我们会创建一个 Work Queue(工作队列),用来分发耗时任务给多个Workers(工人)。
使用Work Queues(别名:Task Queue)是为了避免立即做一个资源密集型任务,而不得不等待它完成。我们可以把这个耗时的任务封装提取起来作为message,发送给一个queue。一个Worker 后台进程会获取task,然后执行他。当有多个Workers 时,他们平分这些task。
准备 上一节的教程,我们发送已一条包含“Hello World”的消息。这一节我们要发送一个复杂的任务。我们不做真实的任务,采用Thread.Sleep()来模拟我们的程序很忙。我们以程序中发送的字符串的点数来作为复杂性。每一个点占用1秒的时间,比如,一个任务的内容是Hello…将会耗时三秒钟。
我们修改上一节的Send.java的代码,允许命令行发送任意消息,这个程序会将任务分配给我们的工作队列,我们叫它NewTask.java:
1 2 3 4 String message = getMessage(argv);channel.basicPublish("" , "hello" , null , message.getBytes()); System.out.println(" [x] Sent '" + message + "'" );
从命令行获取消息信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 private static String getMessage (String[] strings) { if (strings.length < 1 ) return "Hello World!" ; return joinStrings(strings, " " ); } private static String joinStrings (String[] strings, String delimiter) { int length = strings.length; if (length == 0 ) return "" ; StringBuilder words = new StringBuilder (strings[0 ]); for (int i = 1 ; i < length; i++) { words.append(delimiter).append(strings[i]); } return words.toString(); }
我们的Recv.java程序也必须做改变:它需要处理每个消息的内容。我们叫它Worker.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 final Consumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String message = new String (body, "UTF-8" ); System.out.println(" [x] Received '" + message + "'" ); try { doWork(message); } finally { System.out.println(" [x] Done" ); } } }; boolean autoAck = true ; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
我们的工作任务模拟执行时间:
1 2 3 4 5 private static void doWork (String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.' ) Thread.sleep(1000 ); } }
在工作目录中编译这2个文件:
1 $ javac -cp rabbitmq-client.jar NewTask.java Worker.java
循环调度 使用Task Queue的一个优点就是可以很容易的平均分配任务。如果queue里有堆积过多的任务,我们可以添加更多的Worker就行了。规模很容易扩大。
接下来,我们同时运行2个工作实例,他们将从队列中获取消息,但怎么去获取?我们一起看看。
你需要打开三个控制台。2个运行工作程序,这些控制台作为我们的2个消费者 - C1和C2.
1 2 3 shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C
1 2 3 shell2$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C
在第三个控制台发布新的任务。你可以发布少量的消息给消费者消费:
1 2 3 4 5 6 7 8 9 10 shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask First message. shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Second message.. shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Third message... shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Fourth message.... shell3$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar NewTask Fifth message.....
我们看看什么内容给了我们的工作者:
1 2 3 4 5 6 shell1$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
1 2 3 4 5 shell2$ java -cp .:commons-io-1.2 .jar:commons-cli-1.1 .jar:rabbitmq-client.jar Worker [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
默认RabbitMQ会顺序的,平均的把任务发给每个consumer,到最后每个Consumer会得到相同数量的任务。这种分配方式我们称为round-robin。可以尝试第三个或者更多的工作者。
Message acknowledgment 执行一个耗时的任务,你可能会想知道任务的执行情况。是否有Consumer开始执行任务了?是否任务执行到一半死机了? 当前我们上面的代码,一旦RabbitMQ分发message给Custoerm,它就会立刻从内存删除。这种情况下,如果你关闭一个Worker,我们就会丢失他正在执行的消息。同样,我们也会丢失之前分发给他,还没有来的及执行的消息。
但是我们不想丢失任何task。如果一个Worker死了,我们想把任务分发给其他的Worker。
为了确保message不丢失,RabbitMQ 提供了 message acknowledgments。Ack是consumer 发送给RabbitMQ的,告诉它,task 已经接受,并处理了,RabbitMQ 可以删除它了。
如果一个consumer死机了(channel closed,connection closed or Tcp connection lost),没有返回ack,RabbitMQ就会知道task 没有处理完,该task就会重新排队。如果这时候有另外一个Consumer在线,RabbitMQ 就会把它分发给他。
所有的消息都不会超时,当消费者挂掉后RabbitMQ将会重新发送消息,处理一条消息有可能会很长时间,但是总是能处理掉。
默认Message acknowLedgments 是打开的,之前的例子里我们显示的设置了autoAck=true,现在我们设置成false,一旦我们处理了一条任务,我们就发送一个正确的确认信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 channel.basicQos(1 ); final Consumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String message = new String (body, "UTF-8" ); System.out.println(" [x] Received '" + message + "'" ); try { doWork(message); } finally { System.out.println(" [x] Done" ); channel.basicAck(envelope.getDeliveryTag(), false ); } } }; boolean autoAck = false ;channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);
运行上面的代码,如果我们使用CTRL+C kill一个worker,消息也不会被丢失,很快这个消息就会被再次投递给其他工作者。
Forgotten acknowledgment 丢失BasicAck是很常见的错误,尽管这个错很小,但后果很严重。当Client quit,Messages 会重新分发,但是RabbitMQ 由于不能释放掉那些unacked message ,所以会消耗越来越多的内存。
为了 调试这种错误, 你可以使用rabbitmqctl来打印出 messages_unacknowledged 的message信息:
1 2 3 4 $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged Listing queues ... hello 0 0 ...done .
消息持久化 通过上面的ACK配置,当consumer 死亡的时候,task 不会丢失。但是如果RabbitMQ服务停了,task 仍然会丢失。 这里我们就要持久化 task的信息了。
当RabbitMQ停止或者宕机了,如果你没有告诉它怎么处理的话,队列和消息也会丢失。需要做两步操作来确保消息不被丢失:我们需要标记队列和消息是持久化的。
首先,我们需要保证队列在RabbitMQ中不会丢失。我们在代码里声明持久化:
1 2 boolean durable = true ;channel.queueDeclare("hello" , durable, false , false , null );
尽管我们定义名字叫hello 队列要持久化,但是仍然不会生效。这是因为我们已经定义了一个没有持久化的名字叫hello 队列。RabbitMQ 不允许重新定义(用不同的参数)一个已经存在的队列,会报错。因此这里我们应该另外定义一个队列, 例如 task_queue:
1 2 boolean durable = true ;channel.queueDeclare("task_queue" , durable, false , false , null );
queue 持久化的修改,producer 和consumer的代码都要修改.
通过上面的代码设置我们的queue,即使RabbitMQ重启也不会丢失。接下来,我们来持久化message。发布消息的时候提供MessageProperties.PERSISTENT_TEXT_PLAIN值即可持久化。
消息持久化备注 尽管我们设置message持久化了,但是这也不能完全保证message不会丢失。 这是由于RabbitMQ保存message到硬盘是需要时间的,如果再此期间RabbitMQ服务挂了,message就丢失了。不过对于一般的程序已经足够了。如果要一个更强壮的方案,你可是使用publisher confirms .
公平调度 也许你已经主要到,上面代码实现的message的调度不是你想要的。例如,假设有两个Worker,所有的奇数的message都是耗时的操作,而偶数的message都是很简单的。你会发现一个Worker很空闲,而另一个Woker累死累活的。然而RabbitMQ不知道,还是不停的给他发任务。
这个情况的发生,是由于RabbitMQ 不看 the number of unacknowledged message,只要message进入队列就分发message。他只是盲目的分发message。 为了解决上面的问题,我们可以使用 basicQos方法 设置 prefetchCount=1。这个设置会告诉RabbitMQ 每次给Workder只分配一个task,只有当task执行完了,才分发下一个任务。
1 2 int prefetchCount = 1 ;channel.basicQos(prefetchCount);
NOTE:注意queue的size
如果所有的Worker都很忙,你的队列会填满,因此你需要监测queue的情况,添加更多的worker 或者采用其他的策略。
Putting it all together 最终的代码在我们的NewTask.java类中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 import java.io.IOException;import com.rabbitmq.client.ConnectionFactory;import com.rabbitmq.client.Connection;import com.rabbitmq.client.Channel;import com.rabbitmq.client.MessageProperties;public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue" ; public static void main (String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory (); factory.setHost("localhost" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true , false , false , null ); String message = getMessage(argv); channel.basicPublish( "" , TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'" ); channel.close(); connection.close(); } }
(NewTask.java source) Worker.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 import com.rabbitmq.client.*;import java.io.IOException;public class Worker { private static final String TASK_QUEUE_NAME = "task_queue" ; public static void main (String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory (); factory.setHost("localhost" ); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true , false , false , null ); System.out.println(" [*] Waiting for messages. To exit press CTRL+C" ); channel.basicQos(1 ); final Consumer consumer = new DefaultConsumer (channel) { @Override public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte [] body) throws IOException { String message = new String (body, "UTF-8" ); System.out.println(" [x] Received '" + message + "'" ); try { doWork(message); } finally { System.out.println(" [x] Done" ); channel.basicAck(envelope.getDeliveryTag(), false ); } } }; boolean autoAck = false ; channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer); } private static void doWork (String task) { for (char ch : task.toCharArray()) { if (ch == '.' ) { try { Thread.sleep(1000 ); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }
(Worker.java source)
使用message acknowledgments和prefetchCount可以设置一个工作队列。通过durability选项让我们的任务能够在RabbitMQ重启后也能够存在。
关于Channel方法和MessageProperties的更多信息,你可以浏览在线的javadocs .