轻松搞定RabbitMQ(三)——消息应答与消息持久化

       这个官网的第二个例子中的消息应答和消息持久化部分。我把它摘出来做为单独的一起来分享。html


Message acknowledgment(消息应答)

       执行一个任务可能须要花费几秒钟,你可能会担忧若是一个消费者在执行任务过程当中挂掉了。基于如今的代码,一旦RabbitMQ将消息分发给了消费者,就会从内存中删除。在这种状况下,若是杀死正在执行任务的消费者,会丢失正在处理的消息,也会丢失已经分发给这个消费者但还没有处理的消息。
java

       可是,咱们不想丢失任何任务,若是有一个消费者挂掉了,那么咱们应该将分发给它的任务交付给另外一个消费者去处理。
服务器

       为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收而且处理完毕了。RabbitMQ能够删除它了。ide

       若是一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理彻底,而后交给另外一个消费者去从新处理。这样,你就能够确认即便消费者偶尔挂掉也不会不丢失任何消息了。
spa

       没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会从新投递。即便处理一条消息会花费很长的时间。
code

       消息应答是默认打开的。咱们明确地把它们关掉了(autoAck=true)。如今将应答打开,一旦咱们完成任务,消费者会自动发送消息应答。
htm

boolean autoAck = false;
channel.basicConsume(QUEUE_NAME, autoAck, consumer);

       修改一下Worker.javarabbitmq

channel.basicQos(1);//保证一次只分发一个
		// 建立队列消费者
		final Consumer consumer = new DefaultConsumer(channel) {
			  @Override
			  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
			    String message = new String(body, "UTF-8");

			    System.out.println(" [x] Received '" + message + "'");
			    System.out.println(" [x] Proccessing... at " +new Date().toLocaleString());
			    try {
			    	for (char ch: message.toCharArray()) {
				        if (ch == '.') {
				        	Thread.sleep(1000);
				        }
				    }
				} catch (InterruptedException e) {
				} finally {
			      System.out.println(" [x] Done! at " +new Date().toLocaleString());
			      channel.basicAck(envelope.getDeliveryTag(), false);
			    }
			  }
			};
       咱们仍是运行1个生产者,2个消费者,在消息处理过程当中,人为让一个消费者挂掉,而后会看到剩下的任务都会被另外的消费者执行。

       运行结果以下:队列


       若是你关闭了自动消息应答,手动也未设置应答,这是一个很简单错误,可是后果倒是极其严重的。消息在分发出去之后,得不到回应,因此不会在内存中删除,结果RabbitMQ会愈来愈占用内存,最终的结果,你懂得。。。内存


Message durability(消息持久化)

       咱们已经了解了如何确保即便消费者死亡,任务也不会丢失。可是若是RabbitMQ服务器中止,咱们的任务仍将失去!
       当RabbitMQ退出或者崩溃,将会丢失队列和消息。除非你不要队列和消息。两件事儿必须保证消息不被丢失:咱们必须把“队列”和“消息”设为持久化。 
    

boolean durable = true;
channel.queueDeclare("hello", durable, false, false, null);
       尽管这行代码是正确的,但他不会在咱们当前的设置中起做用。由于咱们已经定义了一个名叫hello的未持久化的队列。RabbitMQ不容许使用不一样的参数设定从新定义已经存在的队列,而且会向尝试如此作的程序返回一个错误。一个快速的解决方案——就是声明一个不一样名字的队列,好比task_queue。

       (固然,咱们也能够登陆到RabbitMQ的服务管理页面,RabbitMQ默认的端口是5672,管理页面默认端口是15672,页面地址为:http://localhost:15672,使用是用户名和密码登陆。RabbitMQ的默认密码和用户名都是guest。点开“queue”那栏,能够看到队列列表,点击“hello”杜列,会展开队列的详细信息。把页面拉到最后,有一项“Delete / purge”,点开,点击“Delete”按钮,就能够把队列删除掉了。 而后再运行代码的时候,就会建立一个支持持久化的hello队列。)

       上述的代码须要在生产者和消费者都要做出一样的修改。

       在这一点上咱们确信task_queue的队列不会丢失,即便RabbitMQ服务重启。如今咱们须要将消息标记为持久性的——经过设置 MessageProperties(实现BasicProperties)为PERSISTENT_TEXT_PLAIN。

       如今你能够启动RabbitMQ服务器,执行一次生产者NewTask的程序,而后关闭服务器,再从新启动服务器,运行消费者Work作下实验。能够发现消费者依旧能够读出消息来。说明在RabbitMQ服务器关闭后,消息和队列信息都已经作了持久化。再次启动后,会从新加载到服务器中,消费者运行后,就能够正常的从队列中获取消息了。

相关文章
相关标签/搜索