RabbitMQ是一款使用Erlang开发的开源消息队列。本文假设读者对RabbitMQ是什么已经有了基本的了解,若是你还不知道它是什么以及能够用来作什么,建议先从官网的 RabbitMQ Tutorials 入门教程开始学习。php
本文将会讲解如何使用RabbitMQ实现延时重试和失败消息队列,实现可靠的消息消费,消费失败后,自动延时将消息从新投递,当达到必定的重试次数后,将消息投递到失败消息队列,等待人工介入处理。在这里我会带领你们一步一步的实现一个带有失败重试功能的发布订阅组件,使用该组件后能够很是简单的实现消息的发布订阅,在进行业务开发的时候,业务开发人员能够将主要精力放在业务逻辑实现上,而不须要花费时间去理解RabbitMQ的一些复杂概念。html
本文将会持续修正和更新,最新内容请参考个人 GITHUB 上的 程序猿成长计划 项目,欢迎 Star,更多精彩内容请 follow me。java
咱们将会实现以下功能git
具体流程见下图github
Linus Torvalds 曾经说过json
Talk is cheap. Show me the code
我分别用Java和PHP实现了本文所讲述的方案,读者能够经过参考代码以及本文中的基本步骤来更好的理解服务器
为了实现消息的延时重试和失败存储,咱们须要建立三个Exchange来处理消息。app
全部的Exchange声明(declare)必须使用如下参数ide
参数 | 值 | 说明 |
---|---|---|
exchange | - | Exchange名称 |
type | topic | Exchange 类型 |
passive | false | 若是Exchange已经存在,则返回成功,不存在则建立 |
durable | true | 持久化存储Exchange,这里仅仅是Exchange自己持久化,消息和队列须要单独指定其持久化 |
no-wait | false | 该方法须要应答确认 |
Java代码函数
// 声明Exchange:主体,失败,重试 channel.exchangeDeclare("master", "topic", true); channel.exchangeDeclare("master.retry", "topic", true); channel.exchangeDeclare("master.failed", "topic", true);
PHP代码
// 普通交换机 $this->channel->exchange_declare('master', 'topic', false, true, false); // 重试交换机 $this->channel->exchange_declare('master.retry', 'topic', false, true, false); // 失败交换机 $this->channel->exchange_declare('master.failed', 'topic', false, true, false);
在RabbitMQ的管理界面中,咱们能够看到建立的三个Exchange
消息发布时,使用basic_publish
方法,参数以下
参数 | 值 | 说明 |
---|---|---|
message | - | 发布的消息对象 |
exchange | master | 消息发布到的Exchange |
routing-key | - | 路由KEY,用于标识消息类型 |
mandatory | false | 是否强制路由,指定了该选项后,若是没有订阅该消息,则会返回路由不可达错误 |
immediate | false | 指定了当消息没法直接路由给消费者时如何处理 |
发布消息时,对于message
对象,其内容建议使用json编码后的字符串,同时消息须要标识如下属性
'delivery_mode'=> 2 // 1为非持久化,2为持久化
Java代码
channel.basicPublish( "master", routingKey, MessageProperties.PERSISTENT_BASIC, // delivery_mode message.getBytes() );
PHP代码
$msg = new AMQPMessage($message->serialize(), [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, ]); $this->channel->basic_publish($msg, 'master', $routingKey);
消息订阅的实现相对复杂一些,须要完成队列的声明以及队列和Exchange的绑定。
对于每个订阅消息的服务,都必须建立一个该服务对应的队列,将该队列绑定到关注的路由规则,这样以后,消息生产者将消息投递给Exchange以后,就会按照路由规则将消息分发到对应的队列供消费者消费了。
消费服务须要declare三个队列
[queue_name]
队列名称,格式符合 [服务名称]@订阅服务标识
[queue_name]@retry
重试队列[queue_name]@failed
失败队列订阅服务标识
是客户端本身对订阅的分类标识符,好比用户中心服务(服务名称ucenter),包含两个订阅:user和enterprise,这里两个订阅的队列名称就为ucenter@user
和ucenter@enterprise
,其对应的重试队列为ucenter@user@retry
和ucenter@enterprise@retry
。
Declare队列时,参数规定规则以下
参数 | 值 | 说明 |
---|---|---|
queue | - | 队列名称 |
passive | false | 队列不存在则建立,存在则直接成功 |
durable | true | 队列持久化 |
exclusive | false | 排他,指定该选项为true则队列只对当前链接有效,链接断开后自动删除 |
no-wait | false | 该方法须要应答确认 |
auto-delete | false | 当再也不使用时,是否自动删除 |
对于@retry
重试队列,须要指定额外参数
'x-dead-letter-exchange' => 'master' 'x-dead-letter-routing-key' => [queue_name], 'x-message-ttl' => 30 * 1000 // 重试时间设置为30s
这里的两个header字段的含义是,在队列中延迟30s后,将该消息从新投递到
x-dead-letter-exchange
对应的Exchange中,而且routing key指定为消费队列的名称,这样就能够实现消息只投递给原始出错时的队列,避免消息从新投递给全部关注当前routing key的消费者了。
Java代码
// 声明监听队列 channel.queueDeclare( queueName, // 队列名称 true, // durable false, // exclusive false, // autoDelete null // arguments ); channel.queueDeclare(queueName + "@failed", true, false, false, null); Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-dead-letter-exchange", exchangeName()); arguments.put("x-message-ttl", 30 * 1000); arguments.put("x-dead-letter-routing-key", queueName); channel.queueDeclare(queueName + "@retry", true, false, false, arguments);
PHP代码
$this->channel->queue_declare($queueName, false, true, false, false, false); $this->channel->queue_declare($failedQueueName, false, true, false, false, false); $this->channel->queue_declare( $retryQueueName, // 队列名称 false, // passive true, // durable false, // exclusive false, // auto_delete false, // nowait new AMQPTable([ 'x-dead-letter-exchange' => 'master', 'x-dead-letter-routing-key' => $queueName, 'x-message-ttl' => 30 * 1000, ]) );
在RabbitMQ的管理界面中,Queues部分能够看到咱们建立的三个队列
查看队列的详细信息,咱们能够看到 queueName@retry 队列与其它两个队列的不一样
建立完队列以后,须要将队列与Exchange绑定(bind
),不一样队列须要绑定到以前建立的对应的Exchange上面
Queue | Exchange |
---|---|
[queue_name] | master |
[queue_name]@retry | master.retry |
[queue_name]@failed | master.failed |
绑定时,须要提供订阅的路由KEY,该路由KEY与消息发布时的路由KEY对应,区别是这里可使用通配符同时订阅多种类型的消息。
参数 | 值 | 说明 |
---|---|---|
queue | - | 绑定的队列 |
exchange | - | 绑定的Exchange |
routing-key | - | 订阅的消息路由规则 |
no-wait | false | 该方法须要应答确认 |
Java代码
// 绑定监听队列到Exchange channel.queueBind(queueName, "master", routingKey); channel.queueBind(queueName, exchangeName(), queueName); channel.queueBind(queueName + "@failed", "master.failed", queueName); channel.queueBind(queueName + "@retry", "master.retry", queueName);
PHP代码
$this->channel->queue_bind($queueName, 'master', $routingKey); $this->channel->queue_bind($queueName, 'master', $queueName); $this->channel->queue_bind($retryQueueName, 'master.retry', $queueName); $this->channel->queue_bind($failedQueueName, 'master.failed', $queueName);
在RabbitMQ的管理界面中,咱们能够看到该队列与Exchange和routing-key的绑定关系
使用 basic_consume
对消息进行消费的时候,须要注意下面参数
参数 | 值 | 说明 |
---|---|---|
queue | - | 消费的队列名称 |
consumer-tag | - | 消费者标识,留空便可 |
no_local | false | 若是设置了该字段,服务器将不会发布消息到 发布它的客户端 |
no_ack | false | 须要消费确认应答 |
exclusive | false | 排他访问,设置后只容许当前消费者访问该队列 |
nowait | false | 该方法须要应答确认 |
消费端在消费消息时,须要从消息中获取消息被消费的次数,以此判断该消息处理失败时重试仍是发送到失败队列。
Java代码
protected Long getRetryCount(AMQP.BasicProperties properties) { Long retryCount = 0L; try { Map<String, Object> headers = properties.getHeaders(); if (headers != null) { if (headers.containsKey("x-death")) { List<Map<String, Object>> deaths = (List<Map<String, Object>>) headers.get("x-death"); if (deaths.size() > 0) { Map<String, Object> death = deaths.get(0); retryCount = (Long) death.get("count"); } } } } catch (Exception e) {} return retryCount; }
PHP代码
protected function getRetryCount(AMQPMessage $msg): int { $retry = 0; if ($msg->has('application_headers')) { $headers = $msg->get('application_headers')->getNativeData(); if (isset($headers['x-death'][0]['count'])) { $retry = $headers['x-death'][0]['count']; } } return (int)$retry; }
消息消费完成后,须要发送消费确认消息给服务端,使用basic_ack
方法
ack(delivery-tag=消息的delivery-tag标识)
Java代码
// 消息消费处理 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { ... // 注意,因为使用了basicConsume的autoAck特性,所以这里就不须要手动执行 // channel.basicAck(envelope.getDeliveryTag(), false); } }; // 执行消息消费处理 channel.basicConsume( queueName, true, // autoAck consumer );
PHP代码
$this->channel->basic_consume( $queueName, '', // customer_tag false, // no_local false, // no_ack false, // exclusive false, // nowait function (AMQPMessage $msg) use ($queueName, $routingKey, $callback) { ... $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } );
若是消息处理中出现异常,应该将该消息从新投递到重试Exchange,等待下次重试
basic_publish(msg, 'master.retry', queueName) ack(delivery-tag) // 不要忘记了应答消费成功消息
若是判断重试次数大于3次,仍然处理失败,则应该讲消息投递到失败Exchange,等待人工处理
basic_publish(msg, 'master.failed', queueName) ack(delivery-tag) // 不要忘记了应答消费成功消息
必定不要忘记ack消息,由于重试、失败都是经过将消息从新投递到重试、失败Exchange来实现的,若是忘记ack,则该消息在超时或者链接断开后,会从新被从新投递给消费者,若是消费者依旧没法处理,则会形成死循环。
Java代码
try { String message = new String(body, "UTF-8"); // 消息处理函数 handler.handle(message, envelope.getRoutingKey()); } catch (Exception e) { long retryCount = getRetryCount(properties); if (retryCount > 3) { // 重试次数大于3次,则自动加入到失败队列 Map<String, Object> headers = new HashMap<>(); headers.put("x-orig-routing-key", getOrigRoutingKey(properties, envelope.getRoutingKey())); channel.basicPublish('master.failed', queueName, createOverrideProperties(properties, headers), body); } else { // 重试次数小于3,则加入到重试队列,30s后再重试 Map<String, Object> headers = properties.getHeaders(); if (headers == null) { headers = new HashMap<>(); } headers.put("x-orig-routing-key", getOrigRoutingKey(properties, envelope.getRoutingKey())); channel.basicPublish('master.retry', queueName, createOverrideProperties(properties, headers), body); } }
在消息发送到重试队列和失败队列时,咱们在消息的headers中添加了一个名为x-orig-routing-key
的字段,该字段是实现消息重试的关键字段,因为咱们的消息须要在不一样的Exchange,Queue之间流转,为了不消息在从新投递到主Exchange时,被全部的消费者队列从新消费,在重试过程当中,咱们将消息的routing-key修改成队列名称,直接投递给原始消费消息的队列。x-orig-routing-key
用于在以后可以从新获取到最开始的routing-key。
这里的重复消费是指 某个消息被两个消费方A和B消费了,其中A消费失败,B成功,这时候,消息由A消费者从新投递到主Exchange后,B消费队列也会获取到该消息,所以就会致使B消费者重复消费已经消费国的消息
若是任务重试三次仍未成功,则会被投递到失败队列,这时候须要人工处理程序异常,处理完毕后,须要将消息从新投递到队列进行处理,这里惟一须要作的就是从失败队列订阅消息,而后获取到消息后,清空其application_headers
头信息,而后从新投递到master
这个Exchange便可。
Java代码
channel.basicPublish( 'master', envelope.getRoutingKey(), MessageProperties.PERSISTENT_BASIC, body );
PHP代码
$msg->set('application_headers', new AMQPTable([])); $this->channel->basic_publish( $msg, 'master', $msg->get('routing_key') );
队列和Exchange以及发布订阅的关系咱们就说完了,那么使用起来是什么效果呢?这里咱们以Java代码为例
// 发布消息 Publisher publisher = new Publisher(factory.newConnection(), 'master'); publisher.publish("{\"id\":121, \"name\":\"guanyiyao\"}", "user.create"); // 订阅消息 new Subscriber(factory.newConnection(), Main.EXCHANGE_NAME) .init("user-monitor", "user.*") .subscribe((message, routingKey) -> { // TODO 业务逻辑 System.out.printf(" <%s> message consumed: %s\n", routingKey, message); } );
使用RabbitMQ时,实现延时重试和失败队列的方式并不只仅局限于本文中描述的方法,若是读者有更好的实现方案,欢迎拍砖,在这里我也只是抛砖引玉了。本文中讲述的方法还有不少优化空间,读者也能够试着去改进其实现方案,好比本文中使用了三个Exchagne,是否只使用一个Exchange也能实现本文中所讲述的功能。
本文将会持续修正和更新,最新内容请参考个人 GITHUB 上的 程序猿成长计划 项目,欢迎 Star,更多精彩内容请 follow me。