消息队列&Celery&RabbitMQ&zeromq

1、消息队列java

什么是消息队列?python

“消息队列”是在消息的传输过程当中保存消息的容器。
“消息”是在两台计算机间传送的数据单位。消息能够很是简单,例如只包含文本字符串;也能够更复杂,可能包含嵌入对象。
消息被发送到队列中。“消息队列”是在消息的传输过程当中保存消息的容器。消息队列管理器是消息从它的源传输到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;若是发送消息时接收者不可用,消息队列会保留消息,直到能够成功地传递它。redis

 

为何使用消息队列?数据库

主要缘由是因为在高并发环境下,因为来不及同步处理,请求每每会发生堵塞,好比说,大量的insert,update之类的请求同时到达MySQL,直接致使无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误。经过使用消息队列,咱们能够异步处理请求,从而缓解系统的压力。后端

总结:消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺乏的中间件。目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等缓存

  

消息队列特色:服务器

  • 采用异步处理模式:消息发送者能够发送一个消息而无须等待响应。消息发送者将消息发送到一条虚拟的通道队列)上,消息接收者订阅监听该通道。一条信息可能最终转发给一个或多个消息接收者,这些接收者都无需对消息发送者作出同步回应。整个过程都是异步的网络

  • 应用系统之间解耦合:数据结构

    • 发送者和接受者没必要了解对方、只须要 确认消息
    • 发送者和接受者 没必要同时在线

  好比在线交易系统为了保证数据的最终一致,在支付系统处理完成后会把支付结果放到消息中间件里,通知订单系统修改订单支付状态。两个系统是经过消息中间件解耦的。架构

 

应用场景: 

  • 异步处理,举个栗子:现有用户注册模块,须要同时完成写入注册数据至数据库、发送激活邮件、发送短信验证码。实现包括:串行方式、并行方式
    • 串行方式:先将注册信息写入数据库成功后,再发送激活邮件,最后发送短信验证码。以上三个任务依次所有完成后,返回给客户。
    • 并行方式:先将注册信息写入数据库成功后,发送注册邮件的同时,一块儿发送注册短信。以上三个任务完成后,返回给客户端。与串行的差异是,并行的方式能够提升处理的时间。
  • 应用解耦,举个栗子:现有用户下单模块,当用户下单后,订单系统须要通知库存系统。传统的作法是,在订单系统调用库存系统的接口。假如库存系统没法访问,则订单系统减库存将失败,从而致使订单失败,订单系统与库存系统耦合度太高。
    • 订单系统:用户下单后,在订单系统中将调用库存系统接口的操做放入到消息队列,订单系统中再也不阻塞等待库存系统的返回结果。并将订单下单成功返回给用户。

    • 库存系统:在消息队列中订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操做。

    • 假如:在下单时库存系统不能正常使用。也不影响正常下单,由于下单后,订单系统写入消息队列就再也不关心其余的后续操做了。实现订单系统与库存系统的应用解耦。

  • 流量削锋,通常在秒杀或团抢活动中使用普遍。举个栗子:现有一个秒杀活动,通常会由于流量过大、暴增而致使应用挂掉。为解决这个问题,通常须要将用户请求加入消息队列,达到控制活动的人数,能够缓解短期内高流量压垮应用。消息通信
    • 服务器接收用户请求后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。
    • 秒杀业务根据消息队列中的请求信息,再作后续处理。

  • 消息通信
    • 点对点通信:客户端A和客户端B使用同一队列,进行消息通信;

 

 

  • 聊天室通信:客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现相似聊天室效果。

 

 

消息队列的缺点:

系统可用性下降:系统引入的外部依赖越多,越容易挂掉,假如BCD系统如今都要调用系统A,为了使应用之间解耦合,使用了消息队列MQ,可是有一个问题是若是MQ挂掉,整个系统就都不能使用了。

系统复杂性提升:硬生生加个MQ进来,没法保证消息没有被重复消费,也没法自动解决消息丢失的状况,消息传递的顺序性也没办法保证。

一致性问题:因为后台任务是按期对消息队列中的消息进行处理,于是触发的时机是不可预测的。生产者将任务发布于消息队列中,而后再由消费者订阅任务进行处理,但生产者将任务存放后再也不关系其执行结果是否成功,而是直接返回成功,若是执消费者处理任务失败了,就形成了数据的不一致。

 

消息队列的传输模型

  • 点对点模型:用于消息生产者消息消费者之间点到点的通讯。消息生产者将消息发送到由某个名字标识的特定消费者。这个名字实际上对于消费服务中的一个队列Queue),在消息传递给消费者以前它被存储在这个队列中。队列消息能够放在内存中也能够持久化,以保证在消息服务出现故障时仍然可以传递消息。

    传统的点对点消息中间件一般由 消息队列服务消息传递服务消息队列消息应用程序接口 API 组成,其典型的结构以下图所示。



    • 特色:每一个消息只用一个消费者;发送者和接受者没有时间依赖;接受者确认消息接受和处理成功。

 

  • 发布/订阅模型:支持向一个特定的消息主题生产消息。0多个订阅者 可能对接收来自 特定消息主题 的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方,就比如是匿名公告板。这种模式被概况为:多个消费者能够得到消息,发布者订阅者 之间存在 时间依赖性。发布者须要创建一个 订阅subscription),以便可以消费者订阅。订阅者 必须保持 持续的活动状态接收消息

  在这种状况下,在订阅者 未链接时,发布的消息将在订阅者 从新链接从新发布,以下图所示:


    • 特色:每一个消息能够有多个订阅者;客户端只有订阅后才能接收到消息;持久订阅和非持久订阅。

注意:

  1. 发布者和订阅者有时间依赖:接受者和发布者只有创建订阅关系才能收到消息;
  2. 持久订阅:订阅关系创建后,消息就不会消失,无论订阅者是否都在线;
  3. 非持久订阅:订阅者为了接受消息,必须一直在线。 当只有一个订阅者时约等于点对点模式

 

 

消息丢失的解决:

生产者弄丢了数据(producer)生产者将数据发送到 RabbitMQ 的时候,可能数据在半路就丢失了,由于大并发写入队列致使消息丢失,网络问题啥的等等缘由。 

  • RabbitMQ:1. 开启RabbitMQ提供的事务功能,在生产者发送数据以前开启 RabbitMQ 事务channel.txSelect,而后再发送消息,若是消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就能够回滚事务channel.txRollback,再重试发送消息;若是收到了消息,就能够提交事务channel.txCommit但开启事务太耗费系统性能不推荐。
    # 开启事务
    channel.txSelect()
    try# 发送消息到消息队列中
    except Exception as e:
        # 事务回滚
        channel.txRollback()
    else:
        # 提交事务
        channel.txCommit()

     

  • RabbitMQ:2. 开启 confirm 模式,生产者设置开启 confirm 模式以后,当每次写入消息时都会分配一个惟一的 id,若是消息成功写入了 RabbitMQ 中,RabbitMQ 会回传一个 ack 消息,提示这个消息写入成功。若是 RabbitMQ 没能处理这个消息,会回调生产者的一个 nack 接口,提示这个消息写入失败,能够再次重试。并且能够结合这个机制本身在内存里维护每一个消息 id 的状态,若是超过必定时间还没接收到这个消息的回调,那么你能够重发。

  :事务机制和 cnofirm 机制的区别:事务机制是同步的,提交一个事务以后会一直阻塞,可是 confirm 机制是异步的,发送完这个消息以后能够继续发送下一个消息,第一个写入的那个消息 RabbitMQ 接收以后,会异步回调你的一个接口通知你这个消息接收到了。因此通常在生产者这块避免数据丢失,都是用 confirm 机制的。

 

  • Redis:RedisPUSH/POP机制,利用redis的列表数据结构。比较好的使用模式是,生产者lpush消息,消费者brpop消息,并设定超时时间,能够减小redis的压力。这种方案相对于第一种方案数据可靠性是提升了,只有在redis宕机且数据没有持久化的状况下丢失数据,能够根据业务经过AOF和缩短持久化时间间隔来保证很高的可靠性,并且也能够经过多个client来提升消费速度。但相对于专业的消息队列来讲,这种方案消息的状态过于简单(没有状态),没有ack机制,消息取出后消费失败依赖于client记录日志或者从新push到队列里面。

    注:Redis相较于rabbitMQ没有ack机制,也不能保证消息的顺序性,不适应用做于消息队列来使用。能够考虑消息中间件:Redis做者开源的Disque、阿里开源RocketMQ,以及基于Golang的nsq等,Redis更适用于存储数据。

    • 每一个消费者(单线程)都有本身的单独的队列,这个队列是用rpoplpush命令从公共队列生成。而后:若是消费成功了,用rpop命令把本身的队列销毁,进行下一次循环;若是消费失败,由于是消费者私有的队列,能够自由选择如何处理,不用担忧冲突;若是消费者崩溃了,在消费者启动时会检查本身的队列,把本身队列里的元素放回公共队列。在不考虑redis挂掉的状况下,这种机制保证了任务至少被消费一次。


消息队列(Queue)弄丢了数据:  消息队列本身弄丢了数据,这此时必须开启持久化功能,就是消息写入以后会持久化到磁盘,哪怕是消息队列本身宕机了,恢复以后会自动读取以前存储的数据,通常数据不会丢。

RabbitMQ 设置持久化有两个步骤

  • 建立 queue 时设置为持久化:这样就能够保证 RabbitMQ 持久化 queue 的元数据,可是它是不会持久化 queue 里的数据的。
channel.queue_declare(queue='shuaigaogao', durable=True) # durable=True 持久化 
  • 发送消息时设置持久化,将消息的 deliveryMode 设置为 2,就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。
channel.basic_publish(exchange="",
                      routing_key="shuaigaogao",   # queue的名字
                      body="hello world",          # body是要发送的内容
                      properties=pika.BasicProperties(delivery_mode=2,) # make message persistent=>使消息持久化的特性
                      )

  注意:必需要同时设置这两个持久化,这样 RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。另外一种状况,已经给 RabbitMQ 开启了持久化机制,生产者发送了消息到消息队列中,而且也将其写入了 RabbitMQ 中,可是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会致使内存里的一点点数据丢失。因此,持久化能够跟生产者那边的 confirm 机制配合起来,只有当消息被持久化到磁盘以后,才会通知生产者 ack 了,因此哪怕是在持久化到磁盘以前,RabbitMQ 挂了,数据丢了,生产者收不到 ack你也是能够本身重发的。


Redis 设置持久化有两种方式:RDBAOF
 

消费端弄丢了数据:消费者从消息队列中订阅消息时,可能刚消费到还没处理完,进程就挂了,好比宕机、死锁、网络抖动等等,可是消息队列认为你都消费了那么这数据就丢了。

  • RabbitMQ启用手动确认模式便可
    • ①自动确认模式,消费者挂掉,待ack的消息回归到队列中。消费者抛出异常,消息会不断的被重发,直处处理成功。不会丢失消息,即使服务挂掉,没有处理完成的消息会重回队列,可是异常会让消息不断重试。
    • ②手动确认模式,若是消费者来不及处理就死掉时,没有响应ack时会重复发送一条信息给其余消费者;若是监听程序处理异常了,且未对异常进行捕获,会一直重复接收消息,而后一直抛异常;若是对异常进行了捕获,可是没有在finallyack,也会一直重复发送消息(重试机制)。
    • ③不确认模式,acknowledge="none" 不使用确认机制,只要消息发送完成会当即在队列移除,不管客户端异常仍是断开,只要发送完就移除,不会重发。
    • 消费者业务逻辑异常,可是未手动执行noack致使:经过noack方式来从新放入队列

     注 :消息队列经过判断consumer链接状况来判断消息是否被从新放入队列

 

2、celery

a、概念 celery是基于python实现的一个分布式任务队列框架,主要用于管理分布式任务队列、处理耗时的任务,支持使用任务队列的方式在分布的 机器/进程/线程上 执行任务调度。可让任务的执行彻底脱离主程序,甚至能够被分配到其余主机上运行,一般使用它实现异步任务&定时任务

 

b、组件 

  • 任务(tasks)-- 用户定义的函数,用于实现应用功能,好比执行一个发送短信的耗时任务。
  • 消息中间件(Broker)-- 用于存放tasks的地方,代指任务队列自己,这个中间人须要解决的一个问题,就是可能须要存放很是很是多的tasks,并且要保证Worker可以从这里拿取,常见的有broker有Redis
  • 任务执行单元(Worker)-- 用于执行tasks,从broker中取出tasks,调用执行任务函数。

 

c、具体功能 :

 任务模块 Task:包含异步任务和定时任务。其中,异步任务一般在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。

 消息中间件 Broker:Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 自己不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。

 任务执行单元 Worker:Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。

 任务结果存储 Backend:Backend 用于存储任务的执行结果,以供查询。同消息中间件同样,存储也可以使用 RabbitMQ, Redis 和 MongoDB 等。

 

d、底层原理

celery架构由三个模块组成:消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

消息中间件(Broker): 消息中间人,是任务调度队列,是一个独立的服务,是一个生产者消费之模式,生产者把任务放入队列中,消费者(worker)从任务队列中取出任务执行,任务的执行能够按照顺序依次执行也能够按照计划时间进行。可是Broker自己不提供队列服务,因此要集成第三方队列,推荐使用RatbbitMQ或Redis.

任务执行单元(worker):即执行任务的程序,能够有多个并发。它实时监控消息队列,获取队列中调度的任务,并执行它。

任务执行结果存储(task result store):因为任务的执行同主程序分开,若是主程序想获取任务执行的结果,就必须经过中间件存储。同消息中间人同样,存储也可使用RabbitMQ、Redis;另外,假如不须要保存执行的结果也能够不配置这个模块。

 

e、实现步骤 :

  • 建立一个 Celery 实例 
  • 启动 Celery Worker 
  • 应用程序中调用异步任务

 

 

3、RabbitMQ 

a、概念

RabbitMQ 是一个由 Erlang 语言开发的,而且基于 AMQPAdvanced Message Queuing Protocol)高级消息队列协议的消息队列服务。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。默认端口5672。

AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

 

b、特色: 

  • 可靠性(Reliability):RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

  • 灵活的路由(Flexible Routing):在消息进入队列以前,经过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,能够将多个 Exchange 绑定在一块儿,也经过插件机制实现本身的 Exchange

  • 消息集群(Clustering):多个 RabbitMQ 服务器能够组成一个集群,造成一个逻辑 Broker

  • 高可用(Highly Available Queues):队列能够在集群中的机器上进行镜像,使得在部分节点出问题的状况下队列仍然可用。

  • 多种协议(Multi-protocol):RabbitMQ 支持多种消息队列协议,好比 STOMP、MQTT 等等。

  • 多语言客户端(Many Clients):RabbitMQ 几乎支持全部经常使用语言,好比 Java、.NET、Ruby 等等。

  • 管理界面(Management UI):RabbitMQ 提供了一个易用的用户界面,使得用户能够监控和管理消息 Broker 的许多方面。

  • 跟踪机制(Tracing):若是消息异常,RabbitMQ 提供了消息跟踪机制,使用者能够找出发生了什么。

  • 插件机制(Plugin System):RabbitMQ 提供了许多插件,来从多方面进行扩展,也能够编写本身的插件。

 
c、组件:

RabbitMQ是一个消息代理:它接受和转发消息。能够将其视为邮局:服务器将(生产者)将要发布的邮件放在邮箱中,RabbitMQ最终会将邮件发送给对应API(消费者)。 在这个类比中,RabbitMQ是一个邮箱,邮局和邮递员。RabbitMQ和邮局之间的主要区别在于它不处理纸张,而是接受,存储和转发二进制blob数据 - 消息。在RabbitMQ中邮箱就是一个队列,消息存储在队列当中,队列只受主机的内存和磁盘限制的约束,它本质上是一个大的消息缓冲区。许多生产者能够发送到同一个消息队列,反之也容许不少消费者从同一个队列中接收数据。

生产者是发送消息的程序,消费者是等待接收消息的程序。生产者,消费者和代理(消息队列)没必要驻留在同一主机上;

 

  • Broker:也称Broker/RabbitMQ Server,一种传输服务,维护一条从ProducerConsumer的路线,保证数据可以按照指定的方式进行传输。

  • Producer:消息生产者,数据的发送方。一个Message有两个部分:payload(有效载荷)和label(标签)。payload:传输的数据。labelexchange的名字或者说是一个tag,它描述了payload,并且RabbitMQ也是经过这个label来决定把这个Message发给哪一个ConsumerAMQP仅仅描述了label,而RabbitMQ决定了如何使用这个label的规则。

  • Consumer:消息消费者,数据的接收方。把queue比做是一个有名字的邮箱。当有Message到达某个邮箱后,RabbitMQ把它发送给它的某个订阅者即Consumer, 也可能会把同一个Message发送给不少的Consumer。在这个Message中,只有payload,label已经被删掉了。对于Consumer来讲,它是不知道谁发送的这个信息的,就是协议自己不支持。若是Producer发送的payload包含了Producer的信息就另当别论。
  • Exchange:消息交换器,用于接受、分配消息;它指定消息按什么规则、路由到哪一个队列,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

  • Queue:队列,用于存储生产者的消息;保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者链接到这个队列将其取走。

  • Binding:绑定,决定交换器的消息应该发送到那个队列。它的做用就是把交换器(exchange)和消息队列(queue)按照路由规则绑定起来,是基于路由键将交换器和消息队列链接起来的路由规则成为一个绑定,因此能够将交换器理解成一个由绑定构成的路由表。

  • Routing Key:路由键,用于把生成者的数据分配到交换器上;exchange根据这个关键字进行消息投递

  • BindingKey(绑定键):用于把交换器的消息绑定到队列上;
  • VHost:虚拟主机,一个broker里能够开设多个vhost,用做不一样用户的权限分离。表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每一个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有本身的队列、交换器、绑定和权限机制。vhostAMQP 概念的基础,必须在链接时指定,RabbitMQ 默认的 vhost 是 / 。

  • Connection: 网络链接,好比一个TCP链接。ProducerConsumer都是经过TCP链接到RabbitMQ Server的,程序的起始处就是创建这个TCP链接。
  • Channel(信道):消息通道,消息推送使用的通道;在客户端的每一个链接里,可创建多个channel,每一个channel表明一个会话任务;多路复用链接中的一条独立的双向数据流通道。信道是创建在真实的TCP链接内地虚拟链接,AMQP 命令都是经过信道发出去的,无论是发布消息、订阅队列仍是接收消息,这些动做都是经过信道完成。由于对于操做系统来讲创建和销毁 TCP 都是很是昂贵的开销,因此引入了信道的概念,以复用一条 TCP 链接。

  • ConnectionFactory(链接管理器)应用程序与Rabbit之间创建链接的管理器,程序代码中使用;
  • Message:  消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其余消息的优先权)、delivery-mode(指出该消息可能须要持久性存储)等。

Exchange、Queue、RoutingKey三个才能决定一个从ExchangeQueue的惟一的线路。

 

为何使用Channel,而不是直接使用TCP链接

对于OS来讲,创建和关闭TCP链接是有代价的,频繁的创建关闭TCP链接对于系统的性能有很大的影响,并且TCP的链接数也有限制,这也限制了系统处理高并发的能力。可是,在TCP链接中创建Channel是没有上述代价的。对于Producer或者Consumer来讲,能够并发的使用多个Channel进行Publish或者Receive。有实验代表,1s的数据能够Publish10K的数据包。固然对于不一样的硬件环境,不一样的数据包大小这个数据确定不同,可是我只想说明,对于普通的Consumer或者Producer来讲,这已经足够了。若是不够用,你考虑的应该是如何细化SPLIT你的设计。

 

Exchange的分发策略:

Exchange分发消息时根据类型的不一样分发策略有区别,目前共四种类型:direct、fanout、topic、headers。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器彻底一致,但性能差不少,目前几乎用不到。

  • 消息中的路由键(routing key)若是和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名彻底匹配,若是一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是彻底匹配、单播的模式。
  • fanout每一个发到 fanout 类型交换器的消息都会分到全部绑定的队列上去fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每一个发送到交换器的消息都会被转发到与该交换器绑定的全部队列上。很像子网广播,每台子网内的主机都得到了一份复制的消息。fanout 类型转发消息是最快的。
  • topic:topic 交换器经过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列须要绑定到一个模式上。它将路由键和绑定键的字符串切分红单词,这些单词之间用点隔开。它一样也会识别两个通配符:符号“#”和符号“”。匹配0个或多个单词匹配很少很多一个单词。

 

4、zeromq

a、概念

zeromq 是一个为可伸缩的分布式或并发应用程序设计的高性能异步消息库,提供一个消息队列, 可是与面向消息的中间件不一样,ZeroMQ的运行不须要专门的消息代理(message broker)该库设计成常见的套接字风格的API。zeromq 并非相似rabbitmq消息列队,它实际上只一个消息列队组件,一个库。

请求响应模式(Request-Reply用于未来自ZMQ_REQ客户端的请求发送到一个或多个ZMQ_REP服务,并接收对发送的每一个请求的后续回复,是一种远程过程调用和任务分发模式。客户端在请求后,服务端必须回响应

发布/订阅模式Publish-Subscribe): 从单个发布者到多个订阅者的一对多数据分发。广播全部client,没有队列缓存,断开链接数据将永远丢失。client能够进行数据过滤。

管道模式Parallel Pipeline)以 push/pull 模式链接节点,能够有多个步骤、循环。这是一种并行的任务分发和收集模式。由三部分组成,push进行数据推送,work进行数据缓存,pull进行数据竞争获取处理。区别于Publish-Subscribe存在一个数据缓存和处理负载。当链接被断开,数据不会丢失,重连后数据继续发送到对端。

 

b、代码实现

  • 请求响应模式
    # server端:
    import zmq
    
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind("tcp://*:5555")
    
    while True:
        message = socket.recv()
        print("Received: %s" % message)
        socket.send("I am OK!")
    
    
    # client端:
    import zmq
    import sys
    
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:5555")
    
    socket.send('Are you OK?')
    response = socket.recv();
    print("response: %s" % response)
    
    
    # 输出:
    $ python app/server.py 
    Received: Are you OK?
    
    $ python app/client1.py 
    response: I am OK!

     

 

  • 发布订阅模式
    # server端:
    import zmq
    import time
    
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:5555")
    
    while True:
        print('发送消息')
        socket.send("消息群发")
        time.sleep(1)    
    
    
    # client端1:
    import zmq
    import sys
    
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://localhost:5555")
    socket.setsockopt(zmq.SUBSCRIBE,'')  # 消息过滤
    while True:
        response = socket.recv();
        print("response: %s" % response)
    
    
    #client端2:
    import zmq
    import sys
    
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://localhost:5555")
    socket.setsockopt(zmq.SUBSCRIBE,'') 
    while True:
        response = socket.recv();
        print("response: %s" % response)
    
    
    # 输出:
    $ python app/server.py 
    发送消息
    发送消息
    发送消息
    
    $ python app/client2.py 
    response: 消息群发
    response: 消息群发
    response: 消息群发
    
    $ python app/client1.py 
    response: 消息群发
    response: 消息群发
    response: 消息群发

     

  • 管道模式
    # server端:
    import zmq
    import time
    
    context = zmq.Context()
    socket = context.socket(zmq.PUSH)
    socket.bind("tcp://*:5557")
    
    while True:
        socket.send("测试消息")
        print "已发送"    
        time.sleep(1)    
    
    
    # work端:
    import zmq
    
    context = zmq.Context()
    
    recive = context.socket(zmq.PULL)
    recive.connect('tcp://127.0.0.1:5557')
    
    sender = context.socket(zmq.PUSH)
    sender.connect('tcp://127.0.0.1:5558')
    
    while True:
        data = recive.recv()
        print "正在转发..."
        sender.send(data)
    
    
    # client端:
    import zmq
    import sys
    
    context = zmq.Context()
    socket = context.socket(zmq.PULL)
    socket.bind("tcp://*:5558")
    
    while True:
        response = socket.recv();
        print("response: %s" % response)
    
    
    # 输出结果:
    $ python app/server.py 
    已发送
    已发送
    已发送
    
    $ python app/work.py 
    正在转发...
    正在转发...
    正在转发...
    
    $ python app/client1.py
    response: 测试消息
    response: 测试消息
    response: 测试消息

     

 

 

特性

ActiveMQ

RabbitMQ

RocketMQ

Kafka

单机吞吐量

万级,吞吐量比RocketMQ和Kafka要低了一个数量级

万级,吞吐量比RocketMQ和Kafka要低了一个数量级

10万级,RocketMQ也是能够支撑高吞吐的一种MQ

10万级别,这是kafka最大的优势,就是吞吐量高。

 

通常配合大数据类的系统来进行实时数据计算、日志采集等场景

topic数量对吞吐量的影响

 

 

topic能够达到几百,几千个的级别,吞吐量会有较小幅度的降低

 

这是RocketMQ的一大优点,在同等机器下,能够支撑大量的topic

topic从几十个到几百个的时候,吞吐量会大幅度降低

 

因此在同等机器下,kafka尽可能保证topic数量不要过多。若是要支撑大规模topic,须要增长更多的机器资源

时效性

ms级

微秒级,这是rabbitmq的一大特色,延迟是最低的

ms级

延迟在ms级之内

可用性

高,基于主从架构实现高可用性

高,基于主从架构实现高可用性

很是高,分布式架构

很是高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会致使不可用

消息可靠性

有较低的几率丢失数据

 

通过参数优化配置,能够作到0丢失

通过参数优化配置,消息能够作到0丢失

功能支持

MQ领域的功能极其完备

基于erlang开发,因此并发能力很强,性能极其好,延时很低

MQ功能较为完善,仍是分布式的,扩展性好

功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准

优劣势总结

很是成熟,功能强大,在业内大量的公司以及项目中都有应用

 

偶尔会有较低几率丢失消息

 

并且如今社区以及国内应用都愈来愈少,官方社区如今对ActiveMQ 5.x维护愈来愈少,几个月才发布一个版本

 

并且确实主要是基于解耦和异步来用的,较少在大规模吞吐的场景中使用

 

erlang语言开发,性能极其好,延时很低;

 

吞吐量到万级,MQ功能比较完备

 

并且开源提供的管理界面很是棒,用起来很好用

 

社区相对比较活跃,几乎每月都发布几个版本分

 

在国内一些互联网公司近几年用rabbitmq也比较多一些

 

可是问题也是显而易见的,RabbitMQ确实吞吐量会低一些,这是由于他作的实现机制比较重。

 

并且erlang开发,国内有几个公司有实力作erlang源码级别的研究和定制?若是说你没这个实力的话,确实偶尔会有一些问题,你很难去看懂源码,你公司对这个东西的掌控很弱,基本职能依赖于开源社区的快速维护和修复bug。

 

并且rabbitmq集群动态扩展会很麻烦,不过这个我以为还好。其实主要是erlang语言自己带来的问题。很难读源码,很难定制和掌控。

接口简单易用,并且毕竟在阿里大规模应用过,有阿里品牌保障

 

日处理消息上百亿之多,能够作到大规模吞吐,性能也很是好,分布式扩展也很方便,社区维护还能够,可靠性和可用性都是ok的,还能够支撑大规模的topic数量,支持复杂MQ业务场景

 

并且一个很大的优点在于,阿里出品都是java系的,咱们能够本身阅读源码,定制本身公司的MQ,能够掌控

 

社区活跃度相对较为通常,不过也还能够,文档相对来讲简单一些,而后接口这块不是按照标准JMS规范走的有些系统要迁移须要修改大量代码

 

还有就是阿里出台的技术,你得作好这个技术万一被抛弃,社区黄掉的风险,那若是大家公司有技术实力我以为用RocketMQ挺好的

kafka的特色其实很明显,就是仅仅提供较少的核心功能,可是提供超高的吞吐量,ms级的延迟,极高的可用性以及可靠性,并且分布式能够任意扩展

 

同时kafka最好是支撑较少的topic数量便可,保证其超高吞吐量

 

并且kafka惟一的一点劣势是有可能消息重复消费,那么对数据准确性会形成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响能够忽略

 

这个特性自然适合大数据实时计算以及日志收集

 

RocketMQ,适用于可靠性要求高的场景,尤为是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能没法及时处理的状况。

RabbitMQ,erlang语言自己的并发优点,性能较好,社区活跃度也比较高,可是不利于作二次开发和维护,适用于数据量没有那么大的场景。

Kafka,是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务,有日志采集功能。

 

 

----------------------------------------------------

举个栗子:“P”是咱们的生产者,“C”是咱们的消费者。中间的框是一个队列 -  RabbitMQ表明消费者保留的消息缓冲区。

 

 

生产者将消息发送到“hello”队列。使用者从该队列接收消息。

-------------------------------------------------------------------------------

相关文章
相关标签/搜索