我是在解决分布式事务的一致性问题时了解到RabbitMQ的,当时主要是要基于RabbitMQ来实现咱们分布式系统之间对有事务可靠性要求的系统间通讯的。关于分布式事务一致性问题及其常见的解决方案,能够看我另外一篇博客。提到RabbitMQ,不难想到的几个关键字:消息中间件、消息队列。而消息队列不禁让我想到,当时在大学学习操做系统这门课,消息队列不难想到生产者消费者模式。(PS:操做系统这门课程真的很好也很重要,其中的一些思想在我工做的很长一段一时间内给了我很大帮助和启发,给我提供了许多解决问题的思路。强烈建议每个程序员都去学一学操做系统!)html
1.1 简介程序员
消息中间件也能够称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通讯来进行分布式系统的集成。经过提供消息传递和消息队列模型,能够在分布式环境下扩展进程的通讯。当下主流的消息中间件有RabbitMQ、Kafka、ActiveMQ、RocketMQ等。其能在不一样平台之间进行通讯,经常使用来屏蔽各类平台协议之间的特性,实现应用程序之间的协同。其优势在于可以在客户端和服务器之间进行同步和异步的链接,而且在任什么时候刻均可以将消息进行传送和转发。是分布式系统中很是重要的组件,主要用来解决应用耦合、异步通讯、流量削峰等问题。面试
1.2 做用docker
消息中间件几大主要做用以下:数据库
1.3 消息中间件的两种模式json
1.3.1 P2P模式数组
P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每一个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。安全
P2P的特色:性能优化
1.3.2 Pub/Sub模式服务器
Pub/Sub模式包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
Pub/Sub的特色
1.4 经常使用中间件介绍与对比
RabbitMQ比Kafka可靠,kafka更适合IO高吞吐的处理,通常应用在大数据日志处理或对实时性(少许延迟),可靠性(少许丢数据)要求稍低的场景使用,好比ELK日志收集。
2.1 简介
RabbitMQ是流行的开源消息队列系统。RabbitMQ是AMQP(高级消息队列协议)的标准实现。支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。是使用Erlang编写的一个开源的消息队列,自己支持不少的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的很是重量级,更适合于企业级的开发。同时实现了一个Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由(Routing),负载均衡(Load balance)或者数据持久化都有很好的支持。其主要特色以下:
2.2 概念
RabbitMQ从总体上来看是一个典型的生产者消费者模型,主要负责接收、存储和转发消息。其总体模型架构以下图所示:
咱们先来看一个RabbitMQ的运转流程,稍后会对这个流程中所涉及到的一些概念进行详细的解释。
生产者:
(1)生产者链接到RabbitMQ Broker,创建一个链接( Connection)开启一个信道(Channel)
(2)生产者声明一个交换器,并设置相关属性,好比交换机类型、是否持久化等
(3)生产者声明一个队列井设置相关属性,好比是否排他、是否持久化、是否自动删除等
(4)生产者经过路由键将交换器和队列绑定起来
(5)生产者发送消息至RabbitMQ Broker,其中包含路由键、交换器等信息。
(6)相应的交换器根据接收到的路由键查找相匹配的队列。
(7)若是找到,则将从生产者发送过来的消息存入相应的队列中。
(8)若是没有找到,则根据生产者配置的属性选择丢弃仍是回退给生产者
(9)关闭信道。
(10)关闭链接。
消费者:
(1)消费者链接到RabbitMQ Broker ,创建一个链接(Connection),开启一个信道(Channel) 。
(2)消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数,
(3)等待RabbitMQ Broker 回应并投递相应队列中的消息,消费者接收消息。
(4)消费者确认(ack) 接收到的消息。
(5)RabbitMQ 从队列中删除相应己经被确认的消息。
(6)关闭信道。
(7)关闭链接。
2.2.1 信道
这里咱们主要讨论两个问题:
为什么要有信道?
主要缘由仍是在于TCP链接的"昂贵"性。不管是生产者仍是消费者,都须要和RabbitMQ Broker 创建链接,这个链接就是一条TCP 链接。而操做系统对于TCP链接的建立于销毁是很是昂贵的开销。假设消费者要消费消息,并根据服务需求合理调度线程,若只进行TCP链接,那么当高并发的时候,每秒可能都有成千上万的TCP链接,不只仅是对TCP链接的浪费,也很快会超过操做系统每秒所能创建链接的数量。若是能在一条TCP链接上操做,又能保证各个线程之间的私密性就完美了,因而信道的概念出现了。
信道为什么?
信道是创建在Connection 之上的虚拟链接。当应用程序与Rabbit Broker创建TCP链接的时候,客户端紧接着能够建立一个AMQP 信道(Channel) ,每一个信道都会被指派一个惟一的D。RabbitMQ 处理的每条AMQP 指令都是经过信道完成的。信道就像电缆里的光纤束。一条电缆内含有许多光纤束,容许全部的链接经过多条光线束进行传输和接收。
2.2.2 生产者消费者
关于生产者消费者咱们须要了解几个概念:
2.2.3 队列、交换器、路由key、绑定
从RabbitMQ的运转流程咱们能够知道生产者的消息是发布到交换器上的。而消费者则是从队列上获取消息的。那么消息究竟是如何从交换器到队列的呢?咱们先具体了解一下这几个概念。
Queue:队列,是RabbitMQ的内部对象,用于存储消息。RabbitMQ中消息只能存储在队列中。生产者投递消息到队列,消费者从队列中获取消息并消费。多个消费者能够订阅同一个队列,这时队列中的消息会被平均分摊(轮询)给多个消费者进行消费,而不是每一个消费者都收到全部的消息进行消费。(注意:RabbitMQ不支持队列层面的广播消费,若是须要广播消费,能够采用一个交换器经过路由Key绑定多个队列,由多个消费者来订阅这些队列的方式。)
Exchange:交换器。在RabbitMQ中,生产者并不是直接将消息投递到队列中。真实状况是,生产者将消息发送到Exchange(交换器),由交换器将消息路由到一个或多个队列中。若是路由不到,或返回给生产者,或直接丢弃,或作其它处理。
RoutingKey:路由Key。生产者将消息发送给交换器的时候,通常会指定一个RoutingKey,用来指定这个消息的路由规则。这个路由Key须要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。在交换器类型和绑定键固定的状况下,生产者能够在发送消息给交换器时经过指定RoutingKey来决定消息流向哪里。
Binding:RabbitMQ经过绑定将交换器和队列关联起来,在绑定的时候通常会指定一个绑定键,这样RabbitMQ就能够指定如何正确的路由到队列了。
从这里咱们能够看到在RabbitMQ中交换器和队列实际上能够是一对多,也能够是多对多关系。交换器和队列就像咱们关系数据库中的两张表。他们同归BindingKey作关联(多对多关系表)。在咱们投递消息时,能够经过Exchange和RoutingKey(对应BindingKey)就能够找到相对应的队列。
RabbitMQ主要有四种类型的交换器:
1:RoutingKey 为一个点号"."分隔的字符串(被点号"."分隔开的每一段独立的字符串称为一个单词)λ,如"hs.rabbitmq.client","com.rabbit.client"等。
2:BindingKey 和RoutingKey 同样也是点号"."分隔的字符串;
3:BindingKey 中能够存在两种特殊字符串""和"#",用于作模糊匹配,其中""用于匹配一个单词,"#"用于匹配多规格单词(能够是零个)。
如图:
· 路由键为" apple.rabbit.client" 的消息会同时路由到Queuel 和Queue2;
· 路由键为" orange.mq.client" 的消息只会路由到Queue2 中:
· 路由键为" apple.mq.demo" 的消息只会路由到Queue2 中:
· 路由键为" banana.rabbit.demo" 的消息只会路由到Queuel 中:
· 路由键为" orange.apple.banana" 的消息将会被丢弃或者返回给生产者由于它没有匹配任何路由键。
了解了上面的概念,咱们再来思考消息是如何从交换器到队列的。首先Rabbit在接收到消息时,会解析消息的标签从而获得消息的交换器与路由key信息。而后根据交换器的类型、路由key以及该交换器和队列的绑定关系来决定消息最终投递到哪一个队列里面。
3.1 RabbitMQ安装
这里咱们基于docker来安装。
3.1.1 拉取镜像
docker pull rabbitmq:management
3.1.2 启动容器
docker run -d --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
3.2 RabbitMQ 客户端开发使用
这里咱们以dotnet平台下RabbitMQ.Client3.6.9(能够从nuget中下载)为示例,简单介绍dotnet平台下对RabbitMQ的简单操做。更详细的内容能够从nuget中下载源码和文档进行查看。
3.2.1 链接Rabbit
ConnectionFactory factory = new ConnectionFactory(); factory.UserName = "admin";//用户名 factory.Password = "admin";//密码 factory.HostName = "192.168.17.205";//主机名 factory.VirtualHost = "";//虚拟主机(这个暂时不须要,稍后的文章里会介绍虚拟主机 的概念) factory.Port = 15672;//端口 IConnection conn = factory.CreateConnection();//建立链接
3.2.2 建立信道
IModel channel = conn.CreateModel();
说明:Connection 能够用来建立多个Channel 实例,可是Channel 实例不能在线程问共享,应用程序应该为每个线程开辟一个Channel 。某些状况下Channel 的操做能够并发运行,可是在其余状况下会致使在网络上出现错误的通讯帧交错,同时也会影响友送方确认( publisherconfrrm)机制的运行,因此多线程问共享Channel实例是非线程安全的。
3.2.3 交换器、队列和绑定
channel.ExchangeDeclare("exchangeName", "direct", true); String queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queueName, "exchangeName", "routingKey");
如上建立了一个持久化的、非自动删除的、绑定类型为direct 的交换器,同时也建立了一个非持久化的、排他的、自动删除的队列(此队列的名称由RabbitMQ 自动生成)。这里的交换器和队列也都没有设置特殊的参数。
上面的代码也展现了如何使用路由键将队列和交换器绑定起来。上面声明的队列具有以下特性: 只对当前应用中同一个Connection 层面可用,同一个Connection 的不一样Channel可共用,而且也会在应用链接断开时自动删除。
上述方法根据参数不一样,能够有不一样的重载形式,根据自身的须要进行调用。
ExchangeDeclare方法详解:
ExchangeDeclare有多个重载方法,这些重载方法都是由下面这个方法中缺省的某些参数构成的。
void ExchangeDeclare(string exchange, string type, bool durable, bool autoDelete, IDictionary<string, object> arguments);
QueueDeclare方法详解:
QueueDeclare只有两个重载。
QueueDeclareOk QueueDeclare();
QueueDeclareOk QueueDeclare(string queue, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object> arguments);
不带任何参数的queueDeclare 方法默认建立一个由RabbitMQ 命名的(相似这种amq.gen-LhQzlgv3GhDOv8PIDabOXA 名称,这种队列也称之为匿名队列〉、排他的、自动删除的、非持久化的队列。
注意:生产者和消费者都可以使用queueDeclare来声明一个队列,可是若是消费者在同一个信道上订阅了另外一个队列,就没法再声明队列了。必须先取消订阅,而后将信道直为"传输"模式,以后才能声明队列。
在此我向你们推荐一个架构学习交流群。交流学习群号:478030634 里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多
QueueBind 方法详解:
将队列和交换器绑定的方法以下:
void QueueBind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments);
将队列与交换器解绑的方法以下:
QueueUnbind(string queue, string exchange, string routingKey, IDictionary<string, object> arguments);
其参数与绑定意义相同。
注:除队列能够绑定交换器外,交换器一样能够绑定队列。即:ExchangeBind方法,其使用方式与队列绑定类似。
3.2.4 发送消息
发送消息可使用BasicPublish方法。
void BasicPublish(string exchange, string routingKey, bool mandatory,IBasicProperties basicProperties, byte[] body);
3.2.5 消费消息
RabbitMQ 的消费模式分两种: 推(Push)模式和拉(Pull)模式。推模式采用BasicConsume
进行消费,而拉模式则是调用BasicGet进行消费。
推模式:
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);//定义消费者 对象 consumer.Received += (model, ea) => { //do someting; channel.BasicAck(ea.DeliveryTag, multiple: false);//确认 }; channel.BasicConsume(queue: "queueName", noAck: false, consumer: consumer);//订阅消息
string BasicConsume(string queue, bool noAck, string consumerTag, bool noLocal, bool exclusive, IDictionary<string, object> arguments, IBasicConsumer consumer);
拉模式
BasicGetResult result = channel.BasicGet("queueName", noAck: false);//获取消息 channel.BasicAck(result.DeliveryTag, multiple: false);//确认
3.2.6 关闭链接
在应用程序使用完以后,须要关闭链接,释放资源:
channel.close(); conn.close() ;
显式地关闭Channel 是个好习惯,但这不是必须的,在Connection 关闭的时候,Channel 也会自动关闭。
以上简单介绍了分布式系统中消息中间件的概念与做用,以及RabbitMQ的一些基本概念与简单使用。下一篇文章将继续针对RabbitMQ进行总结。主要内容包括什么时候建立队列、RabbitMQ的确认机制、过时时间的使用、死信队列、以及利用RabbitMQ实现延迟队列......
你们以为文章对你仍是有一点点帮助的,你们能够点击下方二维码进行关注。 《Java烂猪皮》 公众号聊的不只仅是Java技术知识,还有面试等干货,后期还有大量架构干货。你们一块儿关注吧!关注烂猪皮,你会了解的更多..............