分布式系统消息中间件——RabbitMQ的使用进阶篇

一 mandatory 参数web

上一篇文章中咱们知道,生产者将消息发送到RabbitMQ的交换器中经过RoutingKey与BindingKey的匹配将之路由到具体的队列中以供消费者消费。那么当咱们经过匹配规则找不到队列的时候,消息将何去何从呢?Rabbit给咱们提供了两种方式。mandatory与备份交换器。性能优化

mandatory参数是channel.BasicPublish方法中的参数。其主要功能是消息传递过程当中不可达目的地时将消息返回给生产者。当mandatory 参数设为true 时,交换器没法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ 会调用BasicReturn 命令将消息返回给生产者。当mandatory 参数设置为false 时。则消息直接被丢弃。其运转流程与实现代码以下(以C# RabbitMQ.Client 3.6.9为例):
复制代码

 

 

 

二 备份交换器服务器

 

当消息不能路由到队列时,经过mandatory设置参数,咱们能够将消息返回给生产者处理。但这样会有一个问题,就是生产者须要开一个回调的函数来处理不能路由到的消息,这无疑会增长生产者的处理逻辑。备份交换器(Altemate Exchange)则提供了另外一种方式来处理不能路由的消息。备份交换器能够将未被路由的消息存储在RabbitMQ中,在须要的时候去处理这些消息。其主要实现代码以下:
复制代码

 

 

 

 

备份交换器其实和普通的交换器没有太大的区别,为了方便使用,建议设置为fanout类型,若设置为direct 或者topic的类型。须要注意的是,消息被从新发送到备份交换器时的路由键和从生产者发出的路由键是同样的。考虑这样一种状况,若是备份交换器的类型是direct,而且有一个与其绑定的队列,假设绑定的路由键是key1,当某条携带路由键为key2 的消息被转发到这个备份交换器的时候,备份交换器没有匹配到合适的队列,则消息丢失。若是消息携带的路由键为keyl,则能够存储到队列中。 对于备份交换器,有如下几种特殊状况:网络

1.若是设置的备份交换器不存在,客户端和RabbitMQ 服务端都不会有异常出现,此时消息会丢失。数据结构

2.若是备份交换器没有绑定任何队列,客户端和RabbitMQ 服务端都不会有异常出现,此时消息会丢失。架构

3.若是备份交换器没有任何匹配的队列,客户端和RabbitMQ 服务端都不会有异常出现,此时消息会丢失。并发

4.若是备份交换器和mandatory参数一块儿使用,那么mandatory参数无效。分布式

三 过时时间(TTL)函数

3.1 设置消息的TTL 目前有两种方法能够设置消息的TTL。第一种方法是经过队列属性设置,队列中全部消息都有相同的过时时间。第二种方法是对消息自己进行单独设置,每条消息的TTL能够不一样。若是两种方法一块儿使用,则消息的TTL 以二者之间较小的那个数值为准。消息在队列中的生存时间一旦超过设置的TTL值时,就会变成"死信" (Dead Message) ,消费者将没法再收到该消息。(有关死信队列请往下看)微服务

经过队列属性设置消息TTL的方法是在channel.QueueDeclare方法中加入x-message-ttl参数实现的,这个参数的单位是毫秒。示例代码下:
复制代码

 

若是不设置TTL.则表示此消息不会过时;若是将TTL设置为0 ,则表示除非此时能够直接将消息投递到消费者,不然该消息会被当即丢弃(或由死信队列来处理)。

 

针对每条消息设置TTL的方法是在channel.BasicPublish方法中加入Expiration的属性参数,单位为毫秒。关键代码以下:
复制代码

 

注意:对于第一种设置队列TTL属性的方法,一旦消息过时,就会从队列中抹去,而在第二种方法中,即便消息过时,也不会立刻从队列中抹去,由于每条消息是否过时是在即将投递到消费者以前断定的。Why?在第一种方法里,队列中己过时的消息确定在队列头部, RabbitMQ 只要按期从队头开始扫描是否有过时的消息便可。而第二种方法里,每条消息的过时时间不一样,若是要删除全部过时消息势必要扫描整个队列,因此不如等到此消息即将被消费时再断定是否过时,若是过时再进行删除便可。

 

3.2 设置队列的TTL 注意,这里和上述经过队列设置消息的TTL不一样。上面删除的是消息,而这里删除的是队列。经过channel.QueueDeclare 方法中的x-expires参数能够控制队列被自动删除前处于未使用状态的时间。这个未使用的意思是队列上没有任何的消费者,队列也没有被从新声明,而且在过时时间段内也未调用过channel.BasicGet命令。

设置队列里的TTL能够应用于相似RPC方式的回复队列,在RPC中,许多队列会被建立出来,可是倒是未被使用的(有关RabbitMQ实现RPC请往下看)。RabbitMQ会确保在过时时间到达后将队列删除,可是不保障删除的动做有多及时。在RabbitMQ 重启后, 持久化的队列的过时时间会被从新计算。用于表示过时时间的x-expires参数以毫秒为单位, 井且服从和x-message-ttl同样的约束条件,不一样的是它不能设置为0(会报错)。
复制代码

示例代码以下:

 

四 死信队列 DLX(Dead-Letter-Exchange)死信交换器,当消息在一个队列中变成死信以后,它能被从新被发送到另外一个交换器中,这个交换器就是DLX ,绑定DLX的队列就称之为死信队列。 消息变成死信主要有如下几种状况:

 

在此我向你们推荐一个架构学习交流群。交流学习群号:874811168 里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多

消息被拒绝(BasicReject/BasicNack) ,井且设置requeue 参数为false;(消费者确认机制将会在下一篇文章中涉及) 消息过时; 队列达到最大长度。 DLX也是一个正常的交换器,和通常的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ 就会自动地将这个消息从新发布到设置的DLX上去,进而被路由到另外一个队列,即死信队列。能够监听这个队列中的消息、以进行相应的处理。

经过在channel.QueueDeclare 方法中设置x-dead-letter-exchange参数来为这个队列添加DLX。其示例代码以下:
复制代码

 

如下为死信队列的运转流程:

 

 

 

五 延迟队列 RabbitMQ自己并未提供延迟队列的功能。延迟队列是一个逻辑上的概念,能够经过过时时间+死信队列来模拟它的实现。延迟队列的逻辑架构大体以下:

 

 

生产者将消息发送到过时时间为n的队列中,这个队列并未有消费者来消费消息,当过时时间到达时,消息会经过死信交换器被转发到死信队列中。而消费者从死信队列中消费消息。这个时候就达到了生产者发布了消息在讲过了n时间后消费者消费了消息,起到了延迟消费的做用。

 延迟队列在咱们的项目中能够应用于不少场景,如:下单后两个消息取消订单,七天自动收货,七天自动好评,密码冻结后24小时解冻,以及在分布式系统中消息补偿机制(1s后补偿,10s后补偿,5m后补偿......)。
复制代码

 

 

六 优先级队列 就像咱们生活中的“特殊”人士同样,咱们的业务上也存在一些“特殊”消息,可能须要优先进行处理,在生活上咱们可能会对这部分特殊人士开辟一套VIP通道,而Rabbit一样也有这样的VIP通道(前提是在3.5的版本之后),即优先级队列,队列中的消息会有优先级优先级高的消息具有优先被消费的特权。针对这些VIP消息,咱们只需作两件事:

咱们只需作两件事情:

将队列声明为优先级队列,即在建立队列的时候添加参数 x-max-priority 以指定最大的优先级,值为0-255(整数)。 为优先级消息添加优先级。 其示例代码以下:

 

注意:没有指定优先级的消息会将优先级以0对待。 对于超过优先级队列所定最大优先级的消息,优先级以最大优先级对待。对于相同优先级的消息,后进的排在前面。若是在消费者的消费速度大于生产者的速度且Broker 中没有消息堆积的状况下, 对发送的消息设置优先级也就没有什么实际意义。由于生产者刚发送完一条消息就被消费者消费了,那么就至关于Broker 中至多只有一条消息,对于单条消息来讲优先级是没有什么意义的。

 

关于优先级队列,好像违背了队列这种数据结构先进先出的原则,其具体是怎么实现的在这里就不过多讨论。有兴趣的能够本身研究研究。后续可能也会有相关的文章来分析其原理。
复制代码

七 RPC 实现 RPC,是Remote Procedure Call 的简称,即远程过程调用。它是一种经过网络从远程计算机上请求服务,而不须要了解底层网络的技术。RPC 的主要功用是让构建分布式计算更容易,在提供强大的远程调用能力时不损失本地调用的语义简洁性。

有关RPC很少介绍,这里咱们主要介绍RabbitMQ如何实现RPC。RabbitMQ 能够实现很简单的RPC。客户端发送请求消息,服务端回复响应的消息,为了接收响应的消息,咱们须要在请求消息中发送一个回调队列(可使用默认的队列)。其服务器端实现代码以下:
复制代码

 

 

 

客户端实现代码以下:

 

 

以上是Rabbit客户端本身帮咱们封装好的Rpc客户端与服务端的逻辑。固然咱们也能够本身实现,主要是借助于BasicProperties的两个参数。

 

ReplyTo: 一般用来设置一个回调队列。 CorrelationId : 用来关联请求(request) 和其调用RPC 以后的回复(response) 。 其处理流程以下:

 

 

当客户端启动时,建立一个匿名的回调队列。 客户端为RPC 请求设置2个属性: ReplyTo用来告知RPC 服务端回复请求时的目的队列,即回调队列; Correlationld 用来标记一个请求。 请求被发送到RpcQueue队列中。 RPC 服务端监听RpcQueue队列中的请求,当请求到来时,服务端会处理而且把带有结果的消息发送给客户端。接收的队列就是ReplyTo设定的回调队列。 客户端监昕回调队列,当有消息时,检查Correlationld 属性,若是与请求匹配,那就是结果了。 结束语 本篇文章简单介绍了RabbitMQ在咱们项目开发中经常使用的几种特性。这些特性能够帮助咱们更好的将Rabbit用于咱们不一样的业务场景中。这些特性与示例,能够本身在程序中运行一下,而后经过查看Rabbit提供的web管理界面来验证其正确性(关于web管理界面很少介绍,相信你们稍微研究研究就能明白)。固然,关于Rabbit的使用,仍有许多地方在本文中没有说起,如:RabbitMQ的特点——确认机制、持久化......将在下一篇文章中再详细介绍。

出  处:www.cnblogs.com/hunternet

相关文章
相关标签/搜索