My way to Python - Day012 - 消息中间件

消息中间件介绍

消息中间件的概念

消息中间件是在消息传输过程当中保存消息的容器。消息中间件在将消息从它的源中继到它的目标时充当中间人的做用。队列的主要做用是提供路由并保证消息的传递;若是发生消息接收者不可用,消息队列会保留消息,直到能够成功传递它为止,固然,消息队列保存消息也是有期限的。

类比理解记忆

  使用生产者、消费者的关系进行类比
  消息生产者 → 消息中间件 → 消息消费者
消息是有时效性的,存在过时→消息租约的概念

消息中间件特色

1,采用异步处理模式
      消息发送者能够发送一个消息而无须等待响应。
  消息发送者将消息发送到一条虚拟的通道(主题或队列)上,消息接收者则订阅或是监听该通道。
  一条消息可能最终转发给一个或多个消息接收者,这些接收者都无须对消息发送者做出同步回应。整个过程是异步的。
 
好比用户信息注册,注册完毕后过段时间发送邮件或短信。
 
2,应用程序和应用程序调用关系为松耦合关系
     发送者和接受者没必要了解对方、只须要确认消息
     发送者和接受者没必要同时在线
 
好比在线交易系统为了保证数据的最终一致,在支付系统处理完成后会把支付结果放到消息中间件里通知订单系统修改订单支付状态。两个系统经过消息中间件解耦。
 
消息传递服务模型

消息中间件的传递模型

1,点对点模型(PTP)

     点对点模型用于消息生产者和消费者之间点到点的通讯。消息生产者将消息发送到由某个名字标识的特定消费者。这个名字实际上对应于消息服务中的一个队列(Queue),在消息传递给消费者以前他被存储在这个队列中。队列消息能够放在内存中也能够是持久的,以保证在消息服务出现故障时仍然可以传递消息。
 
特性:
  1,每一个消息只有一个消费者
  2,发送者和接收者没有时间依赖
  3,接受者确认消息接收和处理成功
 
模型简图:

2,发布-订阅模型(Pub/Sub)

     发布者/订阅者模型支持向一个特定的消息主题生产消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。 这种模式比如是匿名公告板。这种模式被归纳为:多个消费者能够得到消息.在发布者和订阅者之间存在时间依赖性。发布者须要创建一个订阅(subscription),以便可以消费者订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者创建了持久的订阅。在这种状况下(持久订阅),在订阅者未链接时发布的消息将在订阅者从新链接时从新发布。
 
特性:          
  1,每一个消息能够有多个订阅者
  2,发布者和订阅者有时间依赖,客户端只有在订阅以后才能收到消息
  3,有持久订阅和非持久订阅之分
    ① 持久订阅
      订阅关系创建后,消息就不会消失,无论订阅者是否在线
    ② 非持久订阅
      订阅者为了接受消息,必须一直在线
      当只有一个订阅者的时候约等于点对点模型
 
模型简图:

互联网消息中间件的应用场景案例

场景1:用户注册异步处理案例

网站用户注册,注册成功后会过一下子发送邮件确认或者短信。
 
以下图:

场景2:日志分析使用案例

把日志进行集中收集,用于计算pv、用户行为分析。
 
以下图:

场景3:数据复制案例

  1,将数据从源头复制到多个目的地,通常是要求顺序或者保证因果序列。
  2,用户跨机房数据传输、搜索、离线数据计算等。
 
以下图:

场景4:延迟消息发送和暂存

  1,把消息中间件当作可靠的消息暂存地
  2,定时进行消息投递,好比模拟用户秒杀访问,进行系统性能压测。
       eg: tcpcopy,accesslog等
 
以下图:

场景5:消息广播

  1,缓存数据同步更新
  2,往应用推送数据
     eg:好比更新本地缓存
 
以下图:

消息中间件分类

一、(push)推消息类型:

  消息生产者将消息发送给消息传递服务,消息传递服务又将消息推给消费者。

二、(pull)拉消息类型:

  消费者请求消息服务接收消息,消息生产者从消息中间件拉取该消息。
 

两种类型的区别

 
推跟拉两种类型的本质区别
  谁保存消息的偏移量
    push:生产者保存推送轨迹
    pull:消费者保存pull状态、拉取位置的偏移量

消息中间件实战之MetaQ

介绍

METAQ是一款彻底的队列模型消息中间件,服务器使用Java语言编写,可在多种软硬件平台上部署。
客户端支持Java、C++编程语言。
单台服务器可支持1万以上个消息队列,经过扩容服务器,队列数几乎可任意横向扩展。
每一个队列都是持久化、长度无限(取决于磁盘空间大小)、而且可从队列任意位置开始消费。
 

metaQ架构图:

MetaQ特色

  1,生产者、服务器和消费者均可以分布式
  2,消息存储顺序写
  3,性能极高、吞吐量大
  4,支持消息顺序
  5, 客户端pull,随机读,批量拉取数据
  6,数据迁移、扩容对用户透明
  7,消费状态保存在客户端

metaq重要术语

  一、Topic
    消息的主题,由用户定义并在服务端配置。producer发送消息到某个topic下,consumer从某个topic下消费消息
  二、Offset
    消息在broker上的每一个分区都是组织成一个文件列表,消费者拉取数据须要知道数据在文件中的偏移量,这个偏移量就是所谓offset。Offset是 绝对偏移量,服务器会将offset转化为具体文件的相对偏移量。
  三、Broker
    就是meta的服务端或者说服务器,在消息中间件中也一般称为broker。
  四、分区(partition)
    同一个topic下面还分为多个分区,如meta-test这个topic咱们能够分为10个分区,分别有两台服务器提供,那么可能每台服务器提供5个分区,假设服务器id分别为0和1,则全部分区为0-0、0-一、0-二、0-三、0-四、1-0、1-一、1-二、1-三、1-4。

metaq经常使用配置项

zk.zkEnable=true(是否注册到zk,默认为true)
zk.zkConnect=localhost:2181  (zk的服务器列表)
zk.zkSessionTimeoutMs=30000 (zk心跳超时,单位毫秒,默认30秒 ) 
zk.zkConnectionTimeoutMs=30000  (zk链接超时时间,单位毫秒,默认30秒)
brokerId:(服务器ID(必须是集群内惟一,必须为整型0-1024之间)  
serverPort:(服务器端口)
hostName:(默认将取本机IP (多机网卡,须要指明))
dataLogPath:(日志数据文件路径,默认跟dataPath同样)
dataPath:          (于指定默认的数据存储路径 )
deletePolicy=delete,168   (数据删除策略,默认超过7天即删除,这里的168是小时,10s表示10秒,10m表示10分钟,10h表示10小时,默认为小时)
deleteWhen: (什么时候执行删除策略的cron表达式,默认是0 0 6,18 * * ?,也就是天天的迟早6点执行处理策略。deleteWhen: 删除策略的执行时间,cron表达式)
flushTxLogAtCommit=1 (事务日志的同步设置,0表示让操做系统决定,1表示每次commit都同步,2表示每隔1秒同步一次,此参数严重影响事务性能,可根据你须要的性能和可靠性之间权衡作出一个合理的选择。一般建议设置为2,表示每隔1秒刷盘一次,也就是最多丢失一秒内的运行时事务。这样的可靠级别对大多数服务是足够的。最安全的固然是设置为1,可是将严重影响事务性能。而0的安全级别最低。安全级别上 1>=2>0,而性能则是0 >= 2 > 1。)
unflushThreshold:  (每隔多少条消息作一次磁盘sync,强制将更改的数据刷入磁盘。默认为1000。也就是说在掉电状况下,最多容许丢失1000条消息。可设置为0,强制每次写入都sync。在设置为0的状况下,服务器会自动启用 group commit技术,将多个消息合并成一次sync来提高IO性能。通过测试,group commit状况下消息发送者的TPS没有受到太大影响,可是服务端的负载会上升不少。)
unflushInterval: (间隔多少毫秒按期作一次磁盘sync,默认是10秒。也就是说在服务器掉电状况下,最多丢失10秒内发送过来的消息。不可设置为小于或者等于0)

MetaQ安装说明

     1,首先须要安装配置zookeeper
     2,安装java运行环境
     3,安装MetaQ

metaq的集群实现方法

  step1,全部的broker注册到zookeeper
  step2,客户端链接到zookeeper,为客户分配一个broker

消息中间件实战之RabbitMQ

RabbitMQ介绍

    rabbitMQ是一个在AMQP协议标准基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。采用 Erlang 实现的工业级的消息队列(MQ)服务器。
    官方网站: http://www.rabbitmq.com/
 
   AMQP介绍
     AMQP(高级消息队列协议) 是一个异步消息传递所使用的应用层协议规范,做为线路层协议,而不是API(例如JMS),AMQP 客户端可以无视消息的来源任意发送和接受信息。AMQP的原始用途只是为金融界提供一个能够彼此协做的消息协议,而如今的目标则是为通用消息队列架构提供通用构建工具。所以,面向消息的中间件 (MOM)系统,例如发布/订阅队列,没有做为基本元素实现。反而经过发送简化的AMQ实体,用户被赋予了构建例如这些实体的能力。这些实体也是规范的一 部分,造成了在线路层协议顶端的一个层级:AMQP模型。这个模型统一了消息模式,诸如以前提到的发布/订阅,队列,事务以及流数据,而且添加了额外的特性,例如更易于扩展,基于内容的路由。

rabbitMQ总体架构

rabbitMQ运行原理

rabbitMQ重要术语

  1. Server(broker): 接受客户端链接,实现AMQP消息队列和路由功能的进程。
  2. Virtual Host:实际上是一个虚拟概念,相似于权限控制组,一个Virtual Host里面能够有若干个Exchange和Queue,可是权限控制的最小粒度是Virtual Host
  3.Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有 direct、Fanout和Topic三种,不一样类型的Exchange路由的行为是不同的。
  4.Message Queue:消息队列,用于存储还未被消费者消费的消息。
  5.Message: 由Header和Body组成,Header是由生产者添加的各类属性的集合,包括Message是否被持久化、由哪一个Message Queue接受、优先级是多少等。而Body是真正须要传输的APP数据。
  6.BindingKey:所谓绑定就是将一个特定的Exchange 和一个特定的Queue绑定起来,绑定关键字成为BindingKey。

Exchange分类--直接式交换器类型

      Direct Exchange – 直接交互式处理路由键。
  须要将一个队列绑定到交换机上,要求该消息与一个特定的路由键彻底匹配。这是一个完整的匹配。若是一个队列绑定到该交换机上要求路由键(routing key) “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
   
  图解:
 
Python代码实现(待添加)

Exchange分类--广播式交换器类型

      Fanout Exchange – 广播式路由键。
你只须要简单的将队列绑定到交换机上。
一个发送到交换机的消息都会被转发到与该交换机绑定的全部队列上。
很像子网广播,每台子网内的主机都得到了一份复制的消息。Fanout交换机转发消息是最快的
 
图解:
 
Python代码实现(待添加)

Exchange分类--主题式交换器类型

      Topic Exchange – 主题式交换器,经过消息的路由关键字和绑定关键字的模式匹配,将消息路由到被绑定的队列中。
这种路由器类型能够被用来支持经典的发布/订阅消息传输模型——使用主题名字空间做为消息寻址模式,将消息传递给那些部分或者所有匹配主题模式的多个消费者。
主题交换器类型的工做方式以下: 绑定关键字用零个或多个标记构成,每个标记之间用“.”字符分隔。
绑定关键字必须用这种形式明确说明,并支持通配符: “*”匹配一个词组“#”零个或多个词组
所以绑定关键字“*.stock.#”匹配路由关键字“usd.stock”和“eur.stock.db”,可是不匹配“stock.nasdaq”
 
图解:
 
Python代码实现(待添加)

RabbitMQ安装

  step1,配置epel源java

  step2,yum -y install rabbitmq-server

RabbitMQ经常使用配置

通常状况下,RabbitMQ的默认配置就足够了。若是但愿特殊设置的话,有两个途径:
一个是环境变量的配置文件 rabbitmq-env.conf ;
一个是配置信息的配置文件 rabbitmq.config;
注意,这两个文件默认是没有的,若是须要必须本身建立。
一、rabbitmq-env.conf   这个文件的位置是肯定和不能改变的,位于:/etc/rabbitmq目录下(这个目录须要本身建立)。
  RABBITMQ_NODE_IP_ADDRESS:指定ip地址
  RABBITMQ_NODE_PORT:指定端口号,默认5672
  RABBITMQ_CONFIG_FILE:配置文件的路径,注意配置文件后缀必须是.config
  ABBITMQ_LOG_BASE:日志文件路径
二、rabbitmq.config 这是一个标准的erlang配置文件。它必须符合erlang配置文件的标准。Erlang tuple,结构为{Key,Value}, Key为atom类型, Value为一个term,其中几个关键参数为:
  tcp_listerners设置rabbimq的监听端口,默认为[5672]。
   disk_free_limit 磁盘低水位线,若磁盘容量低于指定值则中止接收数据。
  vm_memory_high_watermark,设置内存低水位线,若低于该水位线,则开启流控机制,默认值是0.4,即内存总量的40%。

rabbitmq.config配置例子

[
  {rabbit, [
    {tcp_listeners,[{"127.0.0.1",5672}]},
    {ssl_listeners, [{"127.0.1.1",5671}]},
    {ssl_options, [{cacertfile,"/usr/local/etc/rabbitmq/ssl/testca/cacert.pem"},
                    {certfile,"/usr/local/etc/rabbitmq/ssl/server/cert.pem"},
                    {keyfile,"/usr/local/etc/rabbitmq/ssl/server/key.pem"},
                    {verify,verify_none},
                    {fail_if_no_peer_cert,false}]}
  ]}
].

RabbitMQ命令介绍

  一、/etc/init.d/rabbitmq-server start | stop | restart | reload
  二、rabbitmqctl add_vhost  vhostname      建立Vhost:
  三、rabbitmqctl delete_vhost vhostname       删除Vhost
  四、rabbitmqctl list_vhosts    遍历全部虚拟主机信息
  五、rabbitmqctl add_user username  password   添加用户及密码
  六、rabbitmqctl change_password username newpassword    修改用户密码
  七、rabbitmqctl set_permissions -p  v_host user  ".*" ".*" ".*"//绑定权限,而且具有读写的权限。
  八、rabbitmqctl  list_queues  //显示全部队列
相关文章
相关标签/搜索