在第一篇咱们写了两个程序经过一个命名的队列分别发送和接收消息。在这一篇,咱们将建立一个工做队列在多个工做线程间分发耗时的工做任务。html
工做队列的核心思想是避免马上处理资源密集型任务致使必须等待其执行完成。相反的,咱们安排这些任务在稍晚的时间完成。咱们将一个任务封装为一个消息并把它发送到队列中。一个后台的工做线程将从队列中取出任务并最终执行。当你运行多个工做线程,这些任务将在这些工做线程间共享。shell
这个概念对于在一个HTTP请求中处理复杂任务的Web应用尤为有用。函数
在前一篇中,咱们发送了一条内容为“Hello World!”的消息。如今,咱们将要发送一些表明复杂任务的字符串。咱们并无诸如改变图片大小或者渲染PDF文件这样的真实的任务,因此假设任务会致使系统的繁忙--经过使用Threed.Sleep()函数。咱们会采用许多的点(.)在字符串中来表达他的复杂性,每个点将消耗一秒钟的工做时间。例如,假设有一个任务“Hello...”将消耗3秒钟。学习
咱们会把上一个例子中的Send.cs文件中的代码稍微调整一下,使得对任意的消息都能经过命令行发送。这个程序将调度任务到咱们的工做队列中,因此让咱们将它命名为NewTask.cs:fetch
1 var message = GetMessage(args); 2 var body = Encoding.UTF8.GetBytes(message); 3 4 var properties = channel.CreateBasicProperties(); 5 properties.SetPersistent(true); 6 7 channel.BasicPublish(exchange: "", 8 routingKey: "task_queue", 9 basicProperties: properties, 10 body: body);
获取命令行消息的帮助方法:spa
1 private static string GetMessage(string[] args) 2 { 3 return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); 4 }
旧有的Receive.cs代码一样须要稍做修改:须要一个为消息中每个点模拟一秒的时间消耗。它将会处理RabbitMQ发布的消息,执行任务,因此咱们称之为Worker.cs。命令行
1 var consumer = new EventingBasicConsumer(channel); 2 consumer.Received += (model, ea) => 3 { 4 var body = ea.Body; 5 var message = Encoding.UTF8.GetString(body); 6 Console.WriteLine(" [x] Received {0}", message); 7 8 int dots = message.Split('.').Length - 1; 9 Thread.Sleep(dots * 1000); 10 11 Console.WriteLine(" [x] Done"); 12 13 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 14 }; 15 channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);
模拟虚拟任务的执行时间:线程
1 int dots = message.Split('.').Length - 1; 2 Thread.Sleep(dots * 1000);
像第一篇中那样编译程序:调试
1 $ csc /r:"RabbitMQ.Client.dll" NewTask.cs 2 $ csc /r:"RabbitMQ.Client.dll" Worker.cs
使用工做队列的好处之一是可以很轻松的并行任务。若是咱们要增强对积压工做的处理,只须要按照上面的方法添加更多的Worker,很是容易扩展。code
首先,咱们同时运行两个Worker。它们都会从队列中获取消息,可是到底是怎样作到的呢?让咱们看看。
你须要打开三个控制台。两个运行Worker,这两个控制台程序将充当消费者--C1与C2。
1 shell1$ Worker.exe 2 Worker 3 [*] Waiting for messages. To exit press CTRL+C
1 shell2$ Worker.exe 2 Worker 3 [*] Waiting for messages. To exit press CTRL+C
在第三个控制台程序中,咱们将发布一些新的任务。一旦你已经运行了消费者,你就能够发布新消息了:
1 shell3$ NewTask.exe First message. 2 shell3$ NewTask.exe Second message.. 3 shell3$ NewTask.exe Third message... 4 shell3$ NewTask.exe Fourth message.... 5 shell3$ NewTask.exe Fifth message.....
让咱们看看有什么发送到了Worker端:
1 shell1$ Worker.exe 2 [*] Waiting for messages. To exit press CTRL+C 3 [x] Received 'First message.' 4 [x] Received 'Third message...' 5 [x] Received 'Fifth message.....'
1 shell2$ Worker.exe 2 [*] Waiting for messages. To exit press CTRL+C 3 [x] Received 'Second message..' 4 [x] Received 'Fourth message....'
默认状况下,RabbitMQ会按顺序将消息逐个发送到消费者。平均状况下,每个消费者将会得到相同数量的消息。这种分发消息的方式成为轮转调度。可使用三个以上的Worker试一试。
处理一个任务可能花费数秒钟。你可能会担忧消费者开始一个较长的任务,可是在完成部分以后就出错了。在咱们如今的代码中,一旦RabbitMQ分发了一条消息给消费者它就会立刻在队列中删除这条消息。在这样的状况下,若是你停止某一个Worker,由于消息正在执行中,咱们将丢失该消息。咱们也将丢失全部分发到该Worker可是未被处理的消息。
可是咱们不想丢失任何一个任务。若是一个Worker停止了,咱们但愿这个任务能被分发给其余Worker。
为了确保消息毫不丢失,RabbitMQ提供了消息确认机制。消费者回发一个确认给RabbitMQ,告知某个消息已经被接收、处理,而后RabbitMQ就能够为所欲为的删除它了。
若是一个消费者在没有回发确认就停止了,RabbitMQ会认为该消息没有被彻底的处理,并会将该消息从新分发给其余的消费者。经过这种方式,你能够肯定没有消息会丢失,即便有Worker会不可意料的停止。
没有消息会超时,RabbitMQ仅仅会在Worker的链接停止的时候从新分发消息。即便处理一个消息花费的时间很长很长也不会有什么关系。
消息确认在默认状况下是开启的。在前面的示例中咱们经过将noAck参数设置为true显示的关闭了消息确认。如今是时候移除该标记了,使完成一个任务时发回一个恰当的确认。
1 var consumer = new EventingBasicConsumer(channel); 2 consumer.Received += (model, ea) => 3 { 4 var body = ea.Body; 5 var message = Encoding.UTF8.GetString(body); 6 Console.WriteLine(" [x] Received {0}", message); 7 8 int dots = message.Split('.').Length - 1; 9 Thread.Sleep(dots * 1000); 10 11 Console.WriteLine(" [x] Done"); 12 13 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 14 }; 15 channel.BasicConsume(queue: "task_queue", noAck: false, consumer: consumer);
使用这段代码,咱们能够确保及时在消费者正在执行时你用CTRL+C强制中断了程序,也不会丢失任何消息。在消费者停止后不久,全部为收到确认的消息都将被从新分发。
被遗忘的确认
缺乏BasicAck是一个很是常见的错误。这是一个简单的错误,可是后果却至关严重。当客户端退出的时候,消息会被从新分发(看起来像是随机分发的),可是RabbitMQ会占用愈来愈多的内存由于它不能释放未确认的消息。
为了调试这种错误,你可使用rabbitmqctl打印messages_unacknowledged字段:
1 $ sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged 2 Listing queues ... 3 hello 0 0 4 ...done.
咱们已经学习了如何确保即便消费者停止,任务也不会丢失。可是若是RabbitMQ服务停止的时候,咱们的任务仍是会丢失。
当RabbitMQ退出或者崩溃,它会忘记存在的队列和队列中的消息,除非你告诉它不要这样。确保消息不丢失,有两件事情是必须的:咱们必须同时把队列和消息标记为持久的(durable)。
首先,咱们须要确保RabbitMQ永远不会丢失队列。为了作到这件事,咱们须要将队列申明为持久的:
1 channel.QueueDeclare(queue: "hello", 2 durable: true, 3 exclusive: false, 4 autoDelete: false, 5 arguments: null);
尽管此命令自己是正确的,可是在当前设置下它不会起做用。由于咱们已经定义过一个叫作hello的队列。RabbitMQ不容许使用不一样的参数重定义一个已经存在的队列,任未尝试作这样的事情的程序都将返回一个错误。可是有一个变通的方法--让咱们用不一样的名称申明一个队列,例如task_queue:
1 channel.QueueDeclare(queue: "task_queue", 2 durable: true, 3 exclusive: false, 4 autoDelete: false, 5 arguments: null);
这个队列申明的改变须要被应用于生产者和消费者。
这个时候,咱们能够肯定即便是RabbitMQ重启了,task_queue也不会丢失。如今咱们须要把咱们的消息标记为持久的(persistent),经过把IBasicProperties.SetPersistent设置为true。
1 var properties = channel.CreateBasicProperties(); 2 properties.SetPersistent(true);
消息持久注记
将消息标记为持久的并不能彻底的保证消息不会丢失。尽管告知了RabbitMQ将消息保存在磁盘上,仍旧有很短的时间里RabbitMQ接收到一个消息而且尚未保存。因此RabbitMQ不会对每条消息作fsync--它可能仅仅被存放在Cache中而不是实际写入到磁盘里面。消息的持久化保证并不健壮,可是对于简单的任务队列已经足够。若是你须要一个更加健壮的保证,你可使用发布者确认。
你可能已经注意到调度依旧不能彻底按照咱们指望的方式工做。设想一个有两个Worker的应用场景,当全部奇数消息都很庞大而偶数消息很轻量的时候,一个Worker老是很是的繁忙而另外一个几乎不作什么事情。嗯,RabbitMQ并不会知道这事儿,它依然会平均的分发消息。
出现这种状况是由于RabbitMQ只是在消息进入队列后就将其分发。它并不会去检查每一个消费者所拥有的未肯定消息的数量。它只是不假思索的将第N个消息调度到第N个消费者。
为了应对这种状况,咱们可使用basicQos方法而且把参数prefetchCount设置为1。这将告诉RabbitMQ不要同一时间调度给同一个消费者超过一条消息。或者,当一个消费者正在处理或确认前一个消息时不要将新消息调度给它。相反的,它会把这个消息调度给下一个不忙碌的消费者。
1 channel.BasicQos(0, 1, false);
队列大小注记
若是全部的消费者都很忙碌,你的队列可能被填满。你但愿能盯着这个问题,而且添加更多的消费者,或者使用其余策略。
NewTask.cs类的最终代码以下:
1 using System; 2 using RabbitMQ.Client; 3 using System.Text; 4 5 class NewTask 6 { 7 public static void Main(string[] args) 8 { 9 var factory = new ConnectionFactory() { HostName = "localhost" }; 10 using(var connection = factory.CreateConnection()) 11 using(var channel = connection.CreateModel()) 12 { 13 channel.QueueDeclare(queue: "task_queue", 14 durable: true, 15 exclusive: false, 16 autoDelete: false, 17 arguments: null); 18 19 var message = GetMessage(args); 20 var body = Encoding.UTF8.GetBytes(message); 21 22 var properties = channel.CreateBasicProperties(); 23 properties.SetPersistent(true); 24 25 channel.BasicPublish(exchange: "", 26 routingKey: "task_queue", 27 basicProperties: properties, 28 body: body); 29 Console.WriteLine(" [x] Sent {0}", message); 30 } 31 32 Console.WriteLine(" Press [enter] to exit."); 33 Console.ReadLine(); 34 } 35 36 private static string GetMessage(string[] args) 37 { 38 return ((args.Length > 0) ? string.Join(" ", args) : "Hello World!"); 39 } 40 }
Worker.cs类的最终代码以下:
1 using System; 2 using RabbitMQ.Client; 3 using RabbitMQ.Client.Events; 4 using System.Text; 5 using System.Threading; 6 7 class Worker 8 { 9 public static void Main() 10 { 11 var factory = new ConnectionFactory() { HostName = "localhost" }; 12 using(var connection = factory.CreateConnection()) 13 using(var channel = connection.CreateModel()) 14 { 15 channel.QueueDeclare(queue: "task_queue", 16 durable: true, 17 exclusive: false, 18 autoDelete: false, 19 arguments: null); 20 21 channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); 22 23 Console.WriteLine(" [*] Waiting for messages."); 24 25 var consumer = new EventingBasicConsumer(channel); 26 consumer.Received += (model, ea) => 27 { 28 var body = ea.Body; 29 var message = Encoding.UTF8.GetString(body); 30 Console.WriteLine(" [x] Received {0}", message); 31 32 int dots = message.Split('.').Length - 1; 33 Thread.Sleep(dots * 1000); 34 35 Console.WriteLine(" [x] Done"); 36 37 channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); 38 }; 39 channel.BasicConsume(queue: "task_queue", 40 noAck: false, 41 consumer: consumer); 42 43 Console.WriteLine(" Press [enter] to exit."); 44 Console.ReadLine(); 45 } 46 } 47 }
你可使用消息肯定和BasicQos设置一个工做队列。持久化选项使得消息RabbitMQ重启的时候也得以保全。
要了解关于IModel和IBasicProperties的更多信息,你能够浏览在线的RabbitMQ .NET客户端API引用。
如今咱们能够前进道教程三并了解如何向多个消费者发送相同的消息。
原文连接:http://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html