由流处理的转发讲到消息队列,总体见:https://segmentfault.com/a/11...。本篇总结下之前的两个消息队列协议:JMS,AMQP以及两个典型的实现Activemq和Rabbitmq.两者对分布式的支持都在主从,对分片的支持很差,扩展性差。主要关注框架组件,功能,简单说下分布式。更经常使用的消息队列见:https://segmentfault.com/a/11...。html
JMS模型
是一种Java消息服务协议,支持两种模型:点对点或队列模型,发布/订阅模型java
点对点
在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。这种模式被归纳为:
1)只有一个消费者将得到消息
2)生产者不须要在接收者消费该消息期间处于运行状态,接收者也一样不须要在消息发送时处于运行状态。
3)每个成功处理的消息都由接收者签收数据库
发布者/订阅者
发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式比如是匿名公告板。这种模式被归纳为:
1)多个消费者能够得到消息
2)在发布者和订阅者之间存在时间依赖性。发布者须要建立一个订阅(subscription),以便客户可以购订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种状况下,在订阅者未链接时发布的消息将在订阅者从新链接时从新发布。apache
ActiveMQ
官方:http://activemq.apache.orgsegmentfault
框架组件
- 逻辑框架:
- 基本组件:
Broker,消息代理,表示消息队列服务器实体,接受客户端链接,提供消息通讯的核心服务。
Producer,消息生产者,业务的发起方,负责生产消息并传输给 Broker 。
Consumer,消息消费者,业务的处理方,负责从 Broker 获取消息并进行业务逻辑处理。
Topic,主题,发布订阅模式下的消息统一聚集地,不一样生产者向 Topic 发送消息,由 Broker 分发到不一样的订阅者,实现消息的广播。
Queue,队列,点对点模式下特定生产者向特定队列发送消息,消费者订阅特定队列接收消息并进行业务逻辑处理。
Message,消息体,根据不一样通讯协议定义的固定格式进行编码的数据包,来封装业务 数据,实现消息的传输。
connector(broker与client,broker与broker)的协议:vm,tcp,xxxx
持久消息的存储:KahaDB(基于文件),journal+JDBC,内存,levelDb(leveldb+zk做为M-S复制方案)
消费/持久化/删除
- 消息存入
内存+磁盘光标,若内存不够,维护一份磁盘索引位置(对于消费速度慢的,后续还会访问的消息),性能会受影响,主要是由于分片很差,单机内存不够。
- 删除
对于点对点的消息一旦消费者完成消费这条消息将从broker上删除;
对于发布订阅类型的消息,即便全部的订阅者都完成了消费,Broker也不必定会立刻删除无用消息,而是保留推送历史,以后会异步清除无用消息。而每一个订阅者消费到了哪条消息的offset会记录在Broker,以避免下次重复消费。由于消息是顺序消费,先进先出,因此只须要记录上次消息消费到哪里就能够了。
-
持久化
持久化三个表:
activemq_acks:用于存储订阅关系。若是是持久化Topic,订阅者和服务器的订阅关系在这个表保存,主要数据库字段以下:服务器
container:消息的destination
sub_dest:若是是使用static集群,这个字段会有集群其余系统的信息
client_id:每一个订阅者都必须有一个惟一的客户端id用以区分
sub_name:订阅者名称
selector:选择器,能够选择只消费知足条件的消息。条件能够用自定义属性实现,可支持多属性and和or操做
last_acked_id:记录消费过的消息的id
activemq_lock:在集群环境,只有一个Master Broker能够得到消息,用于记录哪一个Broker是当前的Master Broker。网络
activemq_msgs:用于存储消息,Queue和Topic都存储在这个表中。主要的数据库字段以下:session
id:自增的数据库主键
container:消息的destination
msgid_prod:消息发送者客户端的主键
msg_seq:是发送消息的顺序,msgid_prod+msg_seq能够组成jms的messageid
expiration:消息的过时时间,存储的是从1970-01-01到如今的毫秒数
msg:消息本体的java序列化对象的二进制数据
priority:优先级,从0-9,数值越大优先级越高
activemq_acks用于存储订阅关系。若是是持久化topic,订阅者和服务器的订阅关系在这个表保存。
集群
AMQP
不限于java的通用消息系统协议,官方:
http://docs.oasis-open.org/am...
定义了如下几层内容:
TYPE - type system and encoding
Transport - AMQP transport layer(全双工可靠递交对等,connection=>多channel的session=>links)
Messaging - AMQP Messaging Layer (对消息确认,拒绝,持久化等规定)
Transactions - AMQP Transactions Layer(事务提交等)
Security - AMQP Security Layers架构
rabbitmq
架构组件

- Message
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其余消息的优先权)、delivery-mode(指出该消息可能须要持久性存储)等。
- Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
- Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。Exchange 类型
Exchange分发消息时根据类型的不一样分发策略有区别,目前共四种类型:direct、fanout、topic、headers(基本不用)
1.直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应绑定键的队列。直连交换机用来处理消息的单播路由(unicast routing)(尽管它也能够处理多播路由)。下边介绍它是如何工做的:
1)将一个队列(一个消费者一个队列)绑定到某个交换机上时,赋予该绑定一个绑定键(Binding Key),假设为R;
2)当一个携带着路由键(Routing Key)为R的消息被发送给直连交换机时,交换机会把它路由给绑定键为R的队列(彻底匹配,有几个R就发几个)
直连交换机的队列一般是循环分发任务给多个消费者(咱们称之为轮询)
2.生产者(P)生产消息推送到 Exchange,遵循 fanout 的规则将消息推送到全部与它绑定 Queue
3.topic
同直连型,只不过是模糊匹配
- Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列链接起来的路由规则,因此能够将交换器理解成一个由绑定构成的路由表。
- Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者链接到这个队列将其取走。(多个消费者能够订阅同一个Queue,这时Queue中的消息会被平均分摊给多个消费者进行处理,而不是每一个消费者都收到全部的消息并处理,因此topic的每一个消费者的topic的queue应该同样或前匹配https://juejin.im/entry/599e5...
- Connection
网络链接,好比一个TCP链接。
- Channel
信道,多路复用链接中的一条独立的双向数据流通道。信道是创建在真实的TCP链接内地虚拟链接,AMQP 命令都是经过信道发出去的,不论是发布消息、订阅队列仍是接收消息,这些动做都是经过信道完成。由于对于操做系统来讲创建和销毁 TCP 都是很是昂贵的开销,因此引入了信道的概念,以复用一条 TCP 链接。
- Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
- Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每一个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有本身的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在链接时指定,RabbitMQ 默认的 vhost 是 / 。
- Broker
表示消息队列服务器实体。
持久化、删除、确认机制、事务
- 持久化
要求消息持久/内存满了(自行开发,由于要释放RAM,队列有很是活跃和静止之分不适合都快照增量的方式)
buffer->文件->刷盘,消息+索引,当删除Segment到阈值文件合并。须要持久化消息,exchange,队列。
Segment的删除rabbit_msg_index模块为每个Segment维护一个unacked计数,每publish一个消息加1,每ack一个消息减1,当unacked=0时,文件删除。
- 消息删除
能够指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,若是是持久化消息的话)中移去消息。不然,RabbitMQ会在队列中消息被消费后当即删除它)每一个队列单独处理
- ACK与事务
默认状况下发布操做是不会返回任何信息给生产者的,
1.使用事务:
client发送Tx.Select
broker发送Tx.Select-Ok(以后publish)
client发送Tx.Commit
broker发送Tx.Commit-Ok
client未收到确认会回滚并重发
2.channel的confirm模式
生产者将信道设置成confirm模式,一旦信道进入confirm模式,全部在该信道上面发布的消息都会被指派一个惟一的ID(从1开始),一旦消息被投递到全部匹配的队列以后,broker就会发送一个确认给生产者(包含消息的惟一ID),这就使得生产者知道消息已经正确到达目的队列了,若是消息和队列是可持久化的,那么确认消息会将消息写入磁盘以后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也能够设置basic.ack的multiple域,表示到这个序列号以前的全部消息都已经获得了处理。
集群
rabbitmq不支持动态扩展,erlang性能高
能够共享 user、vhost、exchange等,全部的数据和状态都是必须在全部节点上复制的,队列例外,只在单个节点而不是全部节点上建立完整的队列信息(元数据、状态、内容,RabbitMQ 2.6.0以后提供了镜像队列以免集群节点故障致使的队列内容不可用)(猜想:消息的持久应该包含交换机上的和队列中的,发给全部队列后消息应该可删除了,消费后队列中的可删除了)
磁盘节点+内存节点,要求集群中至少有一个磁盘节点,全部其余节点能够是内存节点,当节点加入火离开集群时,它们必需要将该变动通知到至少一个磁盘节点,若是只有一个磁盘节点,恰好又是该节点崩溃了,那么集群能够继续路由消息,但不能建立队列、建立交换器、建立绑定、添加用户、更改权限、添加或删除集群节点。框架