MQ的本质 大概地讲就是「一发一存一消费」,在直白点就是一个「转发器」html
生产者先将消息投递到一个叫作「队列」的容器中,而后再从这个容器中取出消息,最后再转发给消费者,仅此而已java
关键字:消息和队列c++
1.消息:就是要传输的数据,能够是最简单的文本字符串,也能够是自定义的复杂格式(只要能按预约格式解析出来便可)git
2.队列:是一种先进先出数据结构。它是存放消息的容器,消息从队尾入队,从对头出队,入队即发消息的过程,出队即收消息的过程。github
现在咱们最经常使用的消息队列产品(RocketMQ,Kafka等等),你会发现:它们都在最原始的消息模型上作了扩展,同时提出了一些新名词,好比:主题(topic),分区(partition),队列(queue)等等web
最初的消息队列就是上一节讲的原始模型,它是一个严格意义上的队列(Queue)。消息按照什么顺序写进去,就按照什么顺序读出来。不过队列没有"读"这个操做,读就是出队,从队头中"删除"这个消息。sql
这即是队列模型:它容许多个生产者往同一消息队列发送消息。可是,若是有多个消费者,其实是竞争的关系,依旧是一条消息只能被其中一个消费者接收到,读完即被删除shell
若是须要将一份消息数据分发给多个消费者,而且每一个消费者都要求收到全量的数据。队列模型则没法知足这个需求了编程
一个可行的方案是:为每一个消费者建立 一个单独的队列,让生产者发送多份。至关于须要让你通知一个部门的人,而你一个一个去通知,每份通知都须要复制 ,发送 。 这种作法比较笨,并且同一份数据会被复制多份,也很浪费空间vim
为了解决这种问题,就演化了另一种消息模型:发布-订阅模型
在发布-订阅模型中,存放消息的容器变成了"主题",订阅者在接收消息以前须要先"订阅主题"。最终,每一个订阅者均可以收到同一个主题的全量信息
仔细对比下它和"队列模型"的异同:生产者就是发布者,队列就是主题,消费者就是订阅者,无本质区别。惟一的不一样点在于:一份消息数据是否能够被屡次消费
目前MQ的应用场景很是多,例如:系统解耦,异步通讯和流量削峰。除此以外,还有延迟通知,最终一致性保证,顺序消息,流式处理等等。
经过对比,能很明显地看出两点差别:
1.引入MQ后,由以前的一次RPC变成了如今的两次RPC,并且生产者只跟队列耦合,它根本无需知道消费者的存在。
2.多了一个中间节点「队列」进行消息转储,至关于将同步变成了异步。
举一个实际例子,好比说电商业务中最多见的「订单支付」场景:在订单支付成功后,须要更新订单状态、更新用户积分、通知商家有新订单、更新推荐系统中的用户画像等等。

引入 MQ 后,订单支付如今只须要关注它最重要的流程:更新订单状态便可。其余不重要的事情所有交给 MQ 来通知。这即是 MQ 解决的最核心的问题:系统解耦。
改造前订单系统依赖 3 个外部系统,改造后仅仅依赖 MQ,并且后续业务再扩展(好比:营销系统打算针对支付用户奖励优惠券),也不涉及订单系统的修改,从而保证了核心流程的稳定性,下降了维护成本。
这个改造还带来了另一个好处:由于 MQ 的引入,更新用户积分、通知商家、更新用户画像这些步骤所有变成了异步执行,能减小订单支付的总体耗时,提高订单系统的吞吐量。这即是 MQ 的另外一个典型应用场景:异步通讯。
除此之外,因为队列能转储消息,对于超出系统承载能力的场景,能够用 MQ 做为 “漏斗” 进行限流保护,即所谓的流量削峰。
咱们还能够利用队列自己的顺序性,来知足消息必须按顺序投递的场景;利用队列 + 定时任务来实现消息的延时消费 ……
以上笔记感谢做者3y 原网址:https://mp.weixin.qq.com/s/3h-pN8qS1ex36LgXMFVOSw
RabbitMQ是消息代理:它接收并转发消息。您能够将其视为邮局,将您要发布的邮件放在邮箱中,能够确保最终将邮件传递给您的收件人。以此类推,RabbitMQ是一个邮政信箱,一个邮局和一个邮递员
RabbitMQ与邮局之间的主要区别在于,它不处理纸张,而是接收,存储和转发数据消息的二进制斑点
rabbitmq是erlang语言编写的,安装rabbitmq以前,须要先安装erlang,这里用erlang的源码进行安装,erlang安装包官网下载地址:http://erlang.org/download/
wget http://erlang.org/download/otp_src_21.1.tar.gz tar -zxvf otp_src_21.1.tar.gz cd otp_src_21.1 # 这里要新建一个erlang文件夹,由于erlang编译安装默认是装在/usr/local下的bin和lib中,这里咱们将他统一装到/usr/local/erlang中,方便查找和使用。 mkdir -p /usr/local/erlang # 在编译以前,必须安装如下依赖包 yum install -y make gcc gcc-c++ m4 openssl openssl-devel ncurses-devel unixODBC unixODBC-devel java java-devel ./configure --prefix=/usr/local/erlang
erlang语言须要依赖于java环境,若是不安装java环境,会报错:Java compiler disabled by user
直接执行make&&makeinstall进行编译安装
make && make install
安装后,在usr/local/erlang中就会出现以下:
而后将/usr/local/erlang/bin这个文件夹加入到环境变量中,加载如下便可直接使用.
vim /etc/profile # 编辑环境变量 ######### 添加以下内容 ############### PATH=$PATH:/usr/local/erlang/bin ######################################## source /etc/profile # 从新加载环境变量
到此,即按照完成,直接输入erl,获得以下图则按照成功
安装完成以后,就能够安装rabbitmq了,安装以前须要去官网查看一下rabbitmq版本对erlang版本的一个支持状况,官网地址:http://www.rabbitmq.com/which-erlang.html
为了方便安装,最好直接使用编译好的二进制文件包,即开即用,不用进行复杂的yum配置等。具体能够参考官方文档:http://www.rabbitmq.com/install-generic-unix.html
# 下载源码包 wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.8/rabbitmq-server-generic-unix-3.7.8.tar.xz # 解压 tar -xvf rabbitmq-server-generic-unix-3.7.8.tar.xz -C /usr/local/ # 添加环境变量 vim /etc/profile ------ 添加以下内容 ------ PATH=$PATH:/usr/local/rabbitmq_server-3.7.8/sbin # 重载一下环境变量 source /etc/profile # 添加web管理插件 rabbitmq-plugins enable rabbitmq_management
默认rabbitmq是没有配置文件的,须要去官方github上,复制一个配置文件模版过来,最新的3.7.0以上的版本可使用新的key-value形式的配置文件rabbitmq.conf,和原来erlang格式的advanced.config相结合,解决一下key-value形式很差定义的配置。github地址:https://github.com/rabbitmq/rabbitmq-server/tree/v3.7.8/docs 将配置文件复制到/usr/local/rabbitmq_server-3.7.8/etc/rabbitmq/下
而后,就能够启动rabbitmq服务了
# 后台启动rabbitmq服务 rabbitmq-server -detached rabbitmqctl stop # 中止服务 rabbitmq-plugins list #能够列出插件列表
上面启用了rabbitmq的管理插件,会有一个web管理界面,默认监听端口15672,将此端口在防火墙上打开,则能够访问web页面:
使用默认的用户 guest / guest (此也为管理员用户)登录,会发现没法登录,报错:User can only log in via localhost。那是由于默认是限制了guest用户只能在本机登录,也就是只能登录localhost:15672。能够经过修改配置文件rabbitmq.conf,取消这个限制: loopback_users这个项就是控制访问的,若是只是取消guest用户的话,只须要loopback_users.guest = false 便可。
以后,就看能够登陆到rabbitmq的web管理界面
信道是生产消费者与rabbit通讯的渠道,生产者publish或是消费者subscribe一个队列都是经过信道来通讯的。信道是创建在TCP链接上的虚拟链接,rabbitmq在一条TCP上创建成百上千个信道来达到多个线程处理,这个TCP被多个线程共享,每一个线程对应一个信道,信道在rabbit都有惟一的ID,保证了信道私有性,对应上惟一的线程使用
疑问:为何不创建多个TCP链接呢?
缘由是rabbit保证性能,系统为每一个线程开辟一个TCP是很是消耗性能, 每秒成百上千的创建销毁TCP会严重消耗系统。因此rabbitmq选择创建多个信道(创建在tcp的虚拟链接) 链接到rabbit上。 相似概念:TCP是电缆,信道就是里面的光纤,每一个光纤都是独立的,互不影响。
exchange的做用就是相似路由器,routing key就是路由键,服务器会根据路由键将消息从交换机路由到队列上去
exchange有多个种类,经常使用的有direct,fanout,topic。前三种相似集合对应关系那样,(direct)1:1,(fanout)1:N,(topic)N:1
direct:相似彻底匹配
fanout:能够把一个消息并行发布到多个队列上去,简单的说就是,当多个队列绑定到fanout的交换机,那么交换器一次性拷贝多个消息分别发送到绑定的队列上,每一个队列有这个消息的副本
ps:这个能够在业务上实现并行处理多个任务,好比,用户上传图片功能,当消息到达交换器上,它能够同时路由到积分 增长队列和其它队列上,达到并行处理的目的,而且易扩展,之后有什么并行任务的时候,直接绑定到fanout交换器 不需求改动以前的代码。
topic:多个交换机能够路由消息到同一个队列。根据模糊匹配,好比一个队列的routing key为*.test,那么凡是到达交换器的消息中的routing key后缀.test都被路由到这个队列上
1.推模式:经过AMQP的basic.consume命令订阅,有消息会自动接收,吞吐量高
2.拉模式:经过AMQP的basic.get命令
注:当队列拥有多个消费者时,队列收到的消息将以循环的方式发送给消费者。每条消息只会发送给一个订阅的消费者
开启持久化功能,需同时知足:消息投递模式选择持久化,交换器开启持久化,队列开启持久化
1.发送方确认模式:消息发送到交换器—发送完毕—>消息投递到队列或持久化到磁盘异步回调通知生产者
2.消费者确认机制:消息投递消费者—ack—删除该条消息—投递下一条
注:收到ACK前,不会把消息再次发送给该消费者,可是会把下一条消息发送给其余消费者
1.生产者产生消息,将消息放入队列
2.消息的消费者(consumer)监听 消息队列,若是队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,形成消息的丢失,这里能够设置成手动的ack,但若是设置成手动ack,处理完后要及时ack消息给队列,不然会形成内存溢出)
1.消息产生者将消息放入队列消费者能够有多个,消费者1,消费者2同时监听一个队列,消息被消费。C1,C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发状况下,默认会产生某一个消息被多个消费者共同使用,能够设置一个开关(syncronize)保证一条消息只能被一个消费者使用)
1.每一个消费者监听本身的队列
2.生产者将消息发给broke,由交换机将消息转发到绑定此交换机的每一个队列,每一个绑定交换机的队列都将接收到消息
1.消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;
2.根据业务功能定义路由字符串
3.从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。
4.业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,能够将程序中的错误封装成消息传入到消息队列中,开发者能够自定义消费者,实时接收错误;
1.星号井号表明通配符
2.星号表明多个单词,井号表明一个单词
3.路由功能添加模糊匹配
4.消息产生者产生消息,把消息交给交换机
5.交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费
(在个人理解看来就是routing查询的一种模糊匹配,就相似sql的模糊查询方式)
RabbitMQ中与事务机制有关的方法有三个:txSelect(),txCommit()以及txRollback(),txSelect()用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务,在经过txSelect开启事务以后,咱们即可以发布消息给broke代理服务器了,若是txCommit提交成功了,则消息必定到达了broke了,若是在txCommit执行以前broke异常崩溃或者因为其余缘由抛出异常,这个时候咱们即可以捕获异常经过txRollback回滚事务
channel.txSelect(); channel.basicPublish(ConfirmConfig.exchangeName,ConfirmConfig.routingKey,MessageProperties.PERSISTENT_TEXT_PLAIN,ConfirmConfig.msg_10B.getBytes()); channel.txCommit();
事务回滚代码以下:
try { channel.txSelect(); channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); int result = 1 / 0; channel.txCommit(); } catch (Exception e) { e.printStackTrace(); channel.txRollback(); }
上面咱们介绍了RabbitMQ可能会遇到的一个问题,即生成者不知道消息是否真正到达broker,随后经过AMQP协议层面为咱们提供了事务机制解决了这个问题,可是采用事务机制实现会下降RabbitMQ的消息吞吐量,那么有没有更加高效的解决方式呢?答案是采用Confirm模式。
生产者将信道设置成confirm模式,一旦信道进入confirm模式,全部在该信道上面发布的消息都会被指派一个惟一的ID(从1开始),一旦消息被投递到全部匹配的队列以后,broker就会发送一个确认给生产者(包含消息的惟一ID),这就使得生产者知道消息已经正确到达目的队列了,若是消息和队列是可持久化的,那么确认消息会将消息写入磁盘以后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也能够设置basic.ack的multiple域,表示到这个序列号以前的全部消息都已经获得了处理。
confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就能够在等信道返回确认的同时继续发送下一条消息,当消息最终获得确认以后,生产者应用即可以经过回调方法来处理该确认消息,若是RabbitMQ由于自身内部错误致使消息丢失,就会发送一条nack消息,生产者应用程序一样能够在回调方法中处理该nack消息。
在channel 被设置成 confirm 模式以后,全部被 publish 的后续消息都将被 confirm(即 ack) 或者被nack一次。可是没有对消息被 confirm 的快慢作任何保证,而且同一条消息不会既被 confirm又被nack
生产者经过调用channel的confirmSelect方法将channel设置为confirm模式,若是没有设置no-wait标志的话,broker会返回confirm.select-ok表示赞成发送者将当前channel信道设置为confirm模式(从目前RabbitMQ最新版本3.6来看,若是调用了channel.confirmSelect方法,默认状况下是直接将no-wait设置成false的,也就是默认状况下broker是必须回传confirm.select-ok的)。
已经在transaction事务模式的channel是不能在设置成confirm模式的,即这两种模式不能共存的
对于固定消息体大小和线程数,若是消息持久化,生产者confirm(或者采用事务机制),消费者ack那么对性能有很大的影响.
消息持久化的优化没有太好方法,用更好的物理存储(SAS, SSD, RAID卡)总会带来改善。生产者confirm这一环节的优化则主要在于客户端程序的优化之上。概括起来,客户端实现生产者confirm有三种编程方式:
普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。其实是一种串行confirm了。
批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm。
异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法。
1.消息被否认接收,消费者使用basic.reject或者basic.nack而且requeue重回队列属性设为false
2.消息在队列里得时间超过了该消息设置得过时时间(TTL)
3.消息队列到达了它的最大长度,以后再收到得消息
当一个消息再队列里变为死信时,它会被从新publish到另外一个exchange交换机上,这个exchange就为DLX。所以咱们只须要再声明正常得业务队列时添加一个可选的”x-dead-letter-exchange“参数,值为死信交换机,死信就会被rabbitmq从新publish到配置的这个交换机上,咱们接着监听这个交换机就能够了。
RabbitMQ Delayed Message Plugin是一个rabbitmq的插件,因此使用前须要安装它,能够参考的GitHub地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
1.安装好插件后只须要声明一个类型type为"x-delayed-message"的exchange,而且在其可选参数下配置一个key为"x-delayed-typ",值为交换机类型(topic/direct/fanout)的属性。
2.声明一个队列绑定到该交换机
3.在发送消息的时候消息的header里添加一个key为"x-delay",值为过时时间的属性,单位毫秒。
4.代码就在上面,配置类为DMP开头的,发送消息的方法为send2()。
5.启动后在rabbitmq控制台能够看到一个类型为x-delayed-message的交换机。
6.继续再浏览器中发送两个请求http://localhost:4399/send2?msg=消息A&time=30和http://localhost:4399/send2?msg=消息B&time=10,这样不会出现死信队列出现的问题
Activemq和Rabbitmq的区别?
Activemq它实现的是JMS协议(Java消息协议)
Rabbitmq实现的是AMQP协议(高级消息队列协议)
Activemq是Java写的
Rabbitmq是Erlang写的,吞吐更多,延时更低