I. 消息中间件特色:
1,异步处理模式
消息发送者能够发送一个消息而无需等待响应,消息发送者将消息发送到一条虚拟的通道或队列上,消息接收者则订阅或监听该通道,一条消息可能最终转发给一个或多个消息接受者,这些接受者都无需对消息发送者作出同步回应,整个过程是异步的,如:用户注册
2,应用程序和应用程序调用关系为松耦合关系,发送者和接受者没必要了解对方,只须要确认消息,发送者和接受者没必要同时在线
好比在线交易系统为了保证数据一致性,在支付系统处理完成后会把支付结果放到消息中间件里通知订单系统修改订单支付状态,两个系统经过消息中间件解耦
消息传递服务模型,以下图:
html
II. 消息中间件传递模型
1,点对点模型PTP
点对点模型用于消息生产者和消息消费者之间点对点的通讯,消息生产者将消息发送到由某个名字标示的特定消费者,此名字实际上对应消息服务中的一个队列queue,在消息传递给消费者以前他被存储在这个队列中,队列消息能够放在内存中也能够是持久的,以保障消息服务出现故障时仍然可以传递消息java
1.1 模型特性:
每一个消息只有一个消费者,发送者和消费者没有时间依赖,接受者确认消息接受和处理成功,以下图:
linux
2,发布订阅模型PUB/SUB
发布者/订阅者模型支持向一个特定的消息主题生产消息,0或多个订阅者可能对接收来自特定消息主题的消息敢兴趣,在这种模型下,发布者和订阅者彼此不知道对方,这种模式比如匿名公告,这种模式被归纳为:多个消费者能够得到消息,在发布者和订阅者之间存在时间依赖性。发布者须要创建一个订阅subscription,以便可以消费者订阅。订阅者必须保持持续的活动状态用来接收消息,除非订阅者创建了持久订阅,在这种状况下,在订阅者未链接时发布的消息在订阅者从新链接时从新发布c++
2.1 发布订阅模型特性
1,每一个消息有多个订阅者
2,客户端只有订阅后才能接受到消息
以下图,当有消息更新后会同时更新订阅者,而且一个消息会有多个订阅者,以下图:

3,持久订阅和非持久订阅
以下图,若是此时持久订阅出现故障,在故障后重启则会从新发送一次消息,若是非持久订阅则不会,消息则错过,以下图:

消费者订阅后,中间件会投递给消费者,消费者确认后在返回给中间件,以下图:

1,发布者和订阅者是存在时间以来的,当接受者和发布者只有创建订阅关系才能收到消息。当订阅关系创建后,消息就不会丢失,无论订阅者是否都在线。
当订阅者接受了消息,必须一直在线,当只有一个订阅者时约等于点对点模式,如一个消息中间件分为多个块,以下图:
git
III. 场景
异步处理web
1,用户注册
用户注册,成功会发生邮件或短信
当正常状况下回发生成功,若是在期间发生网络抖动或者服务意外中止,则可能须要消息超时机制,在或者重试机制,以下图:
后端
2,日志分析
使用案例,计算pv和用户行为分析,订阅模式,以下图:
缓存
3,数据复制
数据从源复制到多个目的地,通常要求保持数据一致性,顺序或者保证因果序列。用于跨机房数据传输,搜索,离线数据计算等,以下图:
安全
4,延迟消息发送和暂存
把消息中间件当成可靠的消息暂存地,定时进行消息投递,好比模拟用户秒杀等作习题压测,以下图:
bash
5,消息广播
缓存数据同步更新,应用推送数据,好比更新本地缓存,以下图:

IV. 消息中间件分类
消息中间件主要分为push和 pull
push推消息模型,消息生产者将消息发送给消息传递服务,消息传递服务又将消息推送给消息消费者
pull拉取消息模型,消费者请求消息服务接收消息,消息生产者从消息中间件拉取消息
在通常场景中,消费者拉取更容易控制,若是是生产者推送给消费者的话,须要提供更多的处理,如负载均衡,后端订阅者信息收集的集群等,显而易见,拉取则更容易控制范围,可是,拉取一般可能一次拉取成千上百万的数据,这时,数据存储设备和带宽也是必备考量的环节,#原文连接www#linu#xea#com/1506.html#,相似注册用户发送短信这种实时的信息则不适用于拉取,更偏向于推送,对好比下图:

######################################################################################
metaq使用java语言编写,能够在多种平台部署,客户端支持c++和java,单台服务可支持1w以上各消息队列,事实上测试可能在4-6千之间,可横向扩展,队列持久化,队列长度无限(取决于磁盘大小),可从队列任何位置开始消费
一般在磁盘中的二进制文件id为8位定长,一般状况下还须要有int,4个字节,数据字节,此时若是有个数据为11个字节,以下
在id,int date就看作一个消息体,以下图:

生产者和服务器和消费者均可分布式,消息存储一般安装顺序写,他的性能也很是高,吞吐量达,而且支持消息顺序,客户pull,随机读,批量拉数据,数据迁移,以及扩展而且对用户同名,最后,消费状态保持在客户端
整体架构,以下图:

topic:消息的主题,由用户定义并在服务器端配置,producer发送消息到某个topic,consumer从某个topic下的消费消息
offset:消息在broker上的每一个分区都是组织成一个文件列表,消费者拉取数据须要指定数据在文件中的偏移量,这个偏移就是所谓的offset,offset是绝对偏移量,服务器会将offset转化为具体文件的相对偏移量
broker:meta的服务器或者书服务器,在消息中间件中一般称为broker
partition(分区):同一个topic下面还分为多个分区,如meta-test这个topic咱们能够分为10个分区,分别有两台服务器提供,那么可能每台服务器提供5个分区,假设服务器id分别是0和1,则全部分区为0-0,0-1,0-2,0-3,0-4,1-0,1-1,1-2,1-3,1-4
zk.zkEnable=true 是否注册到zk,默认为true
zk.zkConnect=localhost:2181 zk的服务器列表
zk.zkSessionTimeoutMs=30000 zk心跳超市,单位毫秒,默认30秒
zk.zkConnection TImeoutMs=30000 zk连接超时时间,单位毫秒,默认30秒
brokerld 服务器id,必须惟一,0-1024之间
serverprot 服务端口
hostName 默认将取本机IP,若是多机网卡须要指明
dataLogPath 日志存放路径,默认和datapath同样
datapath 指定默认数据存储路径
daletepolicy=delete,168 数据删除策略,默认超过7天则删除,这里是168是小时,10s表示秒,10m表示分钟,10h表示小时,默认为小时
deleteWhen 什么时候执行删除策略的cron表达式,默认是0 0 6,18 * * ?,也就是天天的迟早执行处理策略
deletewhen 删除策略的执行时间,cron表达式
flushTxLogAtCommit=1 事务日志的同步设置,0表示让操做系统决定,1表示每次commit都同步,2表示每隔1秒同步一次,不过会影响事务性能,可根据须要的性能和可靠性直接权衡作出一个合理的选择。一般设置为2,表示每隔1秒刷一次,也就最多丢失一秒内的运行时事务,最安全的设置为1,可是严重影响性能,而0的安全级别最低,安全级别上1>=2>0 而性能则是0>=2>1
unflushthreshold 每隔多少条消息作一次磁盘的sync,强制将更改的数据刷入磁盘,默认1000,也就是说在掉电的状况下,最多容许丢失1000条消息,可设置为0,强制每次写如都sync,在设置0的状况下,服务器会自动启用group commit技术,将多个消息合并成一次sync来提升io性能,而group commit状况下消息发送者的tps没有收到太大影响,可是服务端的负责会上升不少
unflushinterval 间隔多少毫秒按期作一个磁盘的sync,默认是10秒,也就是说服务器掉电状况下,最多丢失10秒内发送来的消息,不可设置为小于或者等于0
metaq集群:全部broker注册到zookeeper,客户端连接zookeeper并返回可用的broker列表,选择一个broker并发送消息
RabbitMQ
在AMQP协议标准基础上完整的,可复用的企业消息系统,遵循Mozilla Public License开源协议,采用Erlang实现的工业级的消息队列MQ服务器
AMQP高级消息队列协议,是一个一步消息传递所使用的应用层协议规范,做为线路层协议,而不是API,例如JMS,AMQP客户端可以无视消息的来源任意发送和接受信息。AMQP的原始用途只是为金融界提供一个能够彼此协做的消息协议,而如今的目标则是为通用消息队列架构提供通用构建工具。所以,面向消息的中间件MOM系统,例如发布/订阅队列,没有做为基本元素实现,反而经过发送简化的AMQ实体,用户被赋予 了构建这些实体的能力,这些实体也是规范的一部分,造成了在线路层协议顶端的一个层级,AMQP模型,这个模型赞成了消息模式,例如以前提到的发布订阅,队列,事务以及流数据,而且添加了额外的特性,例如更易于扩展,基于内容的路由
RabbitMQ总体架构,以下图:

I. 运行原理图
RabbitMQ核心组件分别是exchange和queue,以下图:

RabbitMQ术语
Server(broker) 接收客户端连接,实现AMQP消息队列和路由功能的进程
Virtual Host 一个虚拟概念,相似权限控制组,一个virtual host里面可有多个exchange和queue,可是权限控制 的最小粒度是virtual host
exchange 接收生产者发送的消息,并根据binding规则将消息路由给服务器中的队列,exchangeType决定了exchnage路由消息的行为,如,在RabbitMQ中,exchangeType有Direct,fanout和topic三种,不一样类型的exchange路由的行为是不同的
message queue 消息队列,用于存储还未被消费者消费的消息
message 由header和body组成,header是由生产者添加的各类属性的集合,包括message是否被持久化,由那个message queue接受,优先级是多少等,而body是真正须要传输的app数据
bindingkey 所谓绑定就是将一个特定的exchange和一个特定的queue绑定起来,绑定关键字成为bindingkey
Exchange分类
直接式交换器类型
Direct Exchange 直接交互式处理路由键,须要将一个队列绑定到交换机上,须要该消息与一个特定的路由键彻底匹配,这是一个完整的匹配。若是一个队列绑定到该交换机上要求路由键“dog",则只有被标记为“dog"的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog,以下图:

广播式交换器类型
Fanout Exchange 广播式路由键,只须要简单的将队列绑定到交换机上,一个发送到交换机的消息都会被转发到与该交换机绑定的全部队列上,很像子网广播,每台子网内的主机都得到了一份复制的消息,fanout交换机转发消息是最快,以下图:

主题式交换器类型
Topic exchange,主题式交换器,经过消息的路由关键字和绑定关键字的模式匹配,将消息路由到被绑定的队列中,这种路由器类型能够被用来支持经典的发布,订阅消息传输模型,使用主题名字空间做为消息寻址模式,将消息传递给那些部分或者所有匹配主题模式的多个消费者,主题交换机类型的工资方式以下:绑定关键字用零个或多个标记构成,每个标记之间用"."字符分隔,绑定关键字必须用这种形式明确说明,并支持通配符“”匹配一个词组,“#”零个或多个词组,所以绑定关键字“ *。stock.#" 匹配路由关键字”usd.stock“和”eur.stock.db",可是不匹配“stock.nasdaq”,以下图:

配置
配置文件存放:/etc/rabbitmq/rabbitmq-env.conf
RABBITMQ_NODE_IP_ADDRESS:IP地址
RABBITMQ_NODE_PORT:端口号
RABBITMQ_CONFIG_FILE:默认路径
ABBITMQ_LOG_BASE:日志路径
rabbitmq.config是标准的erlang配置文件,必须符合erlang配置标准,erlangtuple,结构为{key,value},key为atom类型,value为一个term,其中关键参数为
tcp_listerners设置rabimq监听端口,默认5672
disk_free_limit磁盘低水位线,若磁盘容量低于指定值则中止接收数据
vm_memory_high_watermark,设置内存低水位线,则开启流控机制,默认值是0.4,即内存总量的40%
安装
[root@LinuxEA ~]# yum install epel-release
[root@LinuxEA ~]# yum install rabbitmq-server -y
[root@LinuxEA ~]# /usr/lib/rabbitmq/bin/rabbitmq-plugins list
[ ] amqp_client 3.3.5
[ ] cowboy 0.5.0-rmq3.3.5-git4b93c2d
[ ] eldap 3.3.5-gite309de4
[ ] mochiweb 2.7.0-rmq3.3.5-git680dba8
[ ] rabbitmq_amqp1_0 3.3.5
[ ] rabbitmq_auth_backend_ldap 3.3.5
[ ] rabbitmq_auth_mechanism_ssl 3.3.5
[ ] rabbitmq_consistent_hash_exchange 3.3.5
[ ] rabbitmq_federation 3.3.5
[ ] rabbitmq_federation_management 3.3.5
[ ] rabbitmq_management 3.3.5
[ ] rabbitmq_management_agent 3.3.5
[ ] rabbitmq_management_visualiser 3.3.5
[ ] rabbitmq_mqtt 3.3.5
[ ] rabbitmq_shovel 3.3.5
[ ] rabbitmq_shovel_management 3.3.5
[ ] rabbitmq_stomp 3.3.5
[ ] rabbitmq_test 3.3.5
[ ] rabbitmq_tracing 3.3.5
[ ] rabbitmq_web_dispatch 3.3.5
[ ] rabbitmq_web_stomp 3.3.5
[ ] rabbitmq_web_stomp_examples 3.3.5
[ ] sockjs 0.3.4-rmq3.3.5-git3132eb9
[ ] webmachine 1.10.3-rmq3.3.5-gite9359c7
[root@LinuxEA ~]#
[root@LinuxEA ~]# /usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management
The following plugins have been enabled:
mochiweb
webmachine
rabbitmq_web_dispatch
amqp_client
rabbitmq_management_agent
rabbitmq_management
Plugin configuration has changed. Restart RabbitMQ for changes to take effect.
[root@LinuxEA ~]#
[root@LinuxEA ~]# systemctl restart rabbitmq-server
建立vhost
[root@LinuxEA ~]# rabbitmqctl add_vhost linuxea
Creating vhost "linuxea" ...
...done.
[root@LinuxEA ~]#
[root@LinuxEA ~]# cat LinuxEA
#!/bin/bash
###www.#linu#xea#.com
#The source address of this article:http://www.lin#uxea.com/1514.html
查看vhosts
[root@LinuxEA ~]# rabbitmqctl list_vhosts
Listing vhosts ...
/
linuxea
...done.
[root@LinuxEA ~]#
删除vhosts:rabbitmqctl delete_vhosts vhostname
添加用户名和密码:
[root@LinuxEA ~]# rabbitmqctl add_user linuxea linuxea
Creating user "linuxea" ...
...done.
[root@LinuxEA ~]#
修改用户名密码:rabbitmqctl change_password username newpassword
受权读写权限:
[root@LinuxEA ~]# rabbitmqctl set_permissions -p linuxea linuxea ".*" ".*" ".*"
Setting permissions for user "linuxea" in vhost "linuxea" ...
...done.
[root@LinuxEA ~]#
查看:-p 便可
[root@LinuxEA ~]# rabbitmqctl list_queues -p linuxea
Listing queues ...
...done.
[root@LinuxEA ~]#