1、消息队列java
什么是消息队列?python
“消息队列”是在消息的传输过程当中保存消息的容器。
“消息”是在两台计算机间传送的数据单位。消息能够很是简单,例如只包含文本字符串;也能够更复杂,可能包含嵌入对象。
消息被发送到队列中。“消息队列”是在消息的传输过程当中保存消息的容器。消息队列管理器是消息从它的源传输到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;若是发送消息时接收者不可用,消息队列会保留消息,直到能够成功地传递它。redis
为何使用消息队列?数据库
主要缘由是因为在高并发环境下,因为来不及同步处理,请求每每会发生堵塞,好比说,大量的insert,update之类的请求同时到达MySQL,直接致使无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误。经过使用消息队列,咱们能够异步处理请求,从而缓解系统的压力。后端
总结:消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺乏的中间件。目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等缓存
消息队列特色:服务器
采用异步处理模式:消息发送者能够发送一个消息而无须等待响应。消息发送者将消息发送到一条虚拟的通道(队列)上,消息接收者则订阅或监听该通道。一条信息可能最终转发给一个或多个消息接收者,这些接收者都无需对消息发送者作出同步回应。整个过程都是异步的。网络
应用系统之间解耦合:数据结构
好比在线交易系统为了保证数据的最终一致,在支付系统处理完成后会把支付结果放到消息中间件里,通知订单系统修改订单支付状态。两个系统是经过消息中间件解耦的。架构
应用场景:
订单系统:用户下单后,在订单系统中将调用库存系统接口的操做放入到消息队列,订单系统中再也不阻塞等待库存系统的返回结果。并将订单下单成功返回给用户。
库存系统:在消息队列中订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操做。
假如:在下单时库存系统不能正常使用。也不影响正常下单,由于下单后,订单系统写入消息队列就再也不关心其余的后续操做了。实现订单系统与库存系统的应用解耦。
秒杀业务根据消息队列中的请求信息,再作后续处理。
消息队列的缺点:
系统可用性下降:系统引入的外部依赖越多,越容易挂掉,假如BCD系统如今都要调用系统A,为了使应用之间解耦合,使用了消息队列MQ,可是有一个问题是若是MQ挂掉,整个系统就都不能使用了。
系统复杂性提升:硬生生加个MQ进来,没法保证消息没有被重复消费,也没法自动解决消息丢失的状况,消息传递的顺序性也没办法保证。
一致性问题:因为后台任务是按期对消息队列中的消息进行处理,于是触发的时机是不可预测的。生产者将任务发布于消息队列中,而后再由消费者订阅任务进行处理,但生产者将任务存放后再也不关系其执行结果是否成功,而是直接返回成功,若是执消费者处理任务失败了,就形成了数据的不一致。
消息队列的传输模型:
Queue
),在消息传递给消费者以前它被存储在这个队列中。队列消息能够放在内存中也能够持久化,以保证在消息服务出现故障时仍然可以传递消息。
传统的点对点消息中间件一般由 消息队列服务、消息传递服务、消息队列 和 消息应用程序接口 API
组成,其典型的结构以下图所示。
0
或 多个订阅者 可能对接收来自 特定消息主题 的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方,就比如是匿名公告板。这种模式被概况为:多个消费者能够得到消息,在 发布者 和 订阅者 之间存在 时间依赖性。发布者须要创建一个 订阅(subscription
),以便可以消费者订阅。订阅者 必须保持 持续的活动状态 并 接收消息。在这种状况下,在订阅者 未链接时,发布的消息将在订阅者 从新链接 时 从新发布,以下图所示:
注意:
- 发布者和订阅者有时间依赖:接受者和发布者只有创建订阅关系才能收到消息;
- 持久订阅:订阅关系创建后,消息就不会消失,无论订阅者是否都在线;
- 非持久订阅:订阅者为了接受消息,必须一直在线。 当只有一个订阅者时约等于点对点模式
消息丢失的解决:
生产者弄丢了数据(producer):生产者将数据发送到 RabbitMQ 的时候,可能数据在半路就丢失了,由于大并发写入队列致使消息丢失,网络问题啥的等等缘由。
channel.txSelect
,而后再发送消息,若是消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就能够回滚事务channel.txRollback
,再重试发送消息;若是收到了消息,就能够提交事务channel.txCommit
。但开启事务太耗费系统性能不推荐。
# 开启事务 channel.txSelect() try: # 发送消息到消息队列中 except Exception as e: # 事务回滚 channel.txRollback() else: # 提交事务 channel.txCommit()
confirm
模式,生产者设置开启 confirm
模式以后,当每次写入消息时都会分配一个惟一的 id,若是消息成功写入了 RabbitMQ 中,RabbitMQ 会回传一个 ack
消息,提示这个消息写入成功。若是 RabbitMQ 没能处理这个消息,会回调生产者的一个 nack
接口,提示这个消息写入失败,能够再次重试。并且能够结合这个机制本身在内存里维护每一个消息 id 的状态,若是超过必定时间还没接收到这个消息的回调,那么你能够重发。 注:事务机制和 cnofirm
机制的区别:事务机制是同步的,提交一个事务以后会一直阻塞,可是 confirm
机制是异步的,发送完这个消息以后能够继续发送下一个消息,第一个写入的那个消息 RabbitMQ 接收以后,会异步回调你的一个接口通知你这个消息接收到了。因此通常在生产者这块避免数据丢失,都是用 confirm
机制的。
注:Redis相较于rabbitMQ没有ack机制,也不能保证消息的顺序性,不适应用做于消息队列来使用。能够考虑消息中间件:Redis做者开源的Disque、阿里开源RocketMQ,以及基于Golang的nsq等,Redis更适用于存储数据。
消息队列(Queue)弄丢了数据: 消息队列本身弄丢了数据,这此时必须开启持久化功能,就是消息写入以后会持久化到磁盘,哪怕是消息队列本身宕机了,恢复以后会自动读取以前存储的数据,通常数据不会丢。
RabbitMQ 设置持久化有两个步骤:
channel.queue_declare(queue='shuaigaogao', durable=True) # durable=True 持久化
deliveryMode
设置为 2,就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。channel.basic_publish(exchange="", routing_key="shuaigaogao", # queue的名字 body="hello world", # body是要发送的内容 properties=pika.BasicProperties(delivery_mode=2,) # make message persistent=>使消息持久化的特性 )
注意:必需要同时设置这两个持久化,这样 RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。另外一种状况,已经给 RabbitMQ 开启了持久化机制,生产者发送了消息到消息队列中,而且也将其写入了 RabbitMQ 中,可是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会致使内存里的一点点数据丢失。因此,持久化能够跟生产者那边的 confirm
机制配合起来,只有当消息被持久化到磁盘以后,才会通知生产者 ack
了,因此哪怕是在持久化到磁盘以前,RabbitMQ 挂了,数据丢了,生产者收不到 ack
,你也是能够本身重发的。
消费端弄丢了数据:消费者从消息队列中订阅消息时,可能刚消费到还没处理完,进程就挂了,好比宕机、死锁、网络抖动等等,可是消息队列认为你都消费了那么这数据就丢了。
注 :消息队列经过判断consumer链接状况来判断消息是否被从新放入队列
2、celery
a、概念 : celery是基于python实现的一个分布式任务队列框架,主要用于管理分布式任务队列、处理耗时的任务,支持使用任务队列的方式在分布的 机器/进程/线程上 执行任务调度。可让任务的执行彻底脱离主程序,甚至能够被分配到其余主机上运行,一般使用它实现异步任务&定时任务
b、组件:
c、具体功能 :
任务模块 Task:包含异步任务和定时任务。其中,异步任务一般在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列。
消息中间件 Broker:Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 自己不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。
任务执行单元 Worker:Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它。
任务结果存储 Backend:Backend 用于存储任务的执行结果,以供查询。同消息中间件同样,存储也可以使用 RabbitMQ, Redis 和 MongoDB 等。
d、底层原理:
celery架构由三个模块组成:消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。
消息中间件(Broker): 消息中间人,是任务调度队列,是一个独立的服务,是一个生产者消费之模式,生产者把任务放入队列中,消费者(worker)从任务队列中取出任务执行,任务的执行能够按照顺序依次执行也能够按照计划时间进行。可是Broker自己不提供队列服务,因此要集成第三方队列,推荐使用RatbbitMQ或Redis.
任务执行单元(worker):即执行任务的程序,能够有多个并发。它实时监控消息队列,获取队列中调度的任务,并执行它。
任务执行结果存储(task result store):因为任务的执行同主程序分开,若是主程序想获取任务执行的结果,就必须经过中间件存储。同消息中间人同样,存储也可使用RabbitMQ、Redis;另外,假如不须要保存执行的结果也能够不配置这个模块。
e、实现步骤 :
3、RabbitMQ
a、概念:
RabbitMQ 是一个由 Erlang 语言开发的,而且基于 AMQP (Advanced Message Queuing Protocol)高级消息队列协议的消息队列服务。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。默认端口5672。
AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。
b、特色:
可靠性(Reliability):RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
灵活的路由(Flexible Routing):在消息进入队列以前,经过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,能够将多个 Exchange 绑定在一块儿,也经过插件机制实现本身的 Exchange 。
消息集群(Clustering):多个 RabbitMQ 服务器能够组成一个集群,造成一个逻辑 Broker 。
高可用(Highly Available Queues):队列能够在集群中的机器上进行镜像,使得在部分节点出问题的状况下队列仍然可用。
多种协议(Multi-protocol):RabbitMQ 支持多种消息队列协议,好比 STOMP、MQTT 等等。
多语言客户端(Many Clients):RabbitMQ 几乎支持全部经常使用语言,好比 Java、.NET、Ruby 等等。
管理界面(Management UI):RabbitMQ 提供了一个易用的用户界面,使得用户能够监控和管理消息 Broker 的许多方面。
跟踪机制(Tracing):若是消息异常,RabbitMQ 提供了消息跟踪机制,使用者能够找出发生了什么。
插件机制(Plugin System):RabbitMQ 提供了许多插件,来从多方面进行扩展,也能够编写本身的插件。
RabbitMQ是一个消息代理:它接受和转发消息。能够将其视为邮局:服务器将(生产者)将要发布的邮件放在邮箱中,RabbitMQ最终会将邮件发送给对应API(消费者)。 在这个类比中,RabbitMQ是一个邮箱,邮局和邮递员。RabbitMQ和邮局之间的主要区别在于它不处理纸张,而是接受,存储和转发二进制blob数据 - 消息。在RabbitMQ中邮箱就是一个队列,消息存储在队列当中,队列只受主机的内存和磁盘限制的约束,它本质上是一个大的消息缓冲区。许多生产者能够发送到同一个消息队列,反之也容许不少消费者从同一个队列中接收数据。
生产者是发送消息的程序,消费者是等待接收消息的程序。生产者,消费者和代理(消息队列)没必要驻留在同一主机上;
Broker:也称Broker/RabbitMQ Server,一种传输服务,维护一条从Producer到Consumer的路线,保证数据可以按照指定的方式进行传输。
Producer:消息生产者,数据的发送方。一个Message有两个部分:payload(有效载荷)和label(标签)。payload:传输的数据。label是exchange的名字或者说是一个tag,它描述了payload,并且RabbitMQ也是经过这个label来决定把这个Message发给哪一个Consumer。AMQP仅仅描述了label,而RabbitMQ决定了如何使用这个label的规则。
Exchange:消息交换器,用于接受、分配消息;它指定消息按什么规则、路由到哪一个队列,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
Queue:队列,用于存储生产者的消息;保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者链接到这个队列将其取走。
Binding:绑定,决定交换器的消息应该发送到那个队列。它的做用就是把交换器(exchange)和消息队列(queue)按照路由规则绑定起来,是基于路由键将交换器和消息队列链接起来的路由规则成为一个绑定,因此能够将交换器理解成一个由绑定构成的路由表。
Routing Key:路由键,用于把生成者的数据分配到交换器上;exchange根据这个关键字进行消息投递
VHost:虚拟主机,一个broker里能够开设多个vhost,用做不一样用户的权限分离。表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每一个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有本身的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在链接时指定,RabbitMQ 默认的 vhost 是 / 。
Channel(信道):消息通道,消息推送使用的通道;在客户端的每一个链接里,可创建多个channel,每一个channel表明一个会话任务;多路复用链接中的一条独立的双向数据流通道。信道是创建在真实的TCP链接内地虚拟链接,AMQP 命令都是经过信道发出去的,无论是发布消息、订阅队列仍是接收消息,这些动做都是经过信道完成。由于对于操做系统来讲创建和销毁 TCP 都是很是昂贵的开销,因此引入了信道的概念,以复用一条 TCP 链接。
由Exchange、Queue、RoutingKey三个才能决定一个从Exchange到Queue的惟一的线路。
为何使用Channel,而不是直接使用TCP链接?
对于OS来讲,创建和关闭TCP链接是有代价的,频繁的创建关闭TCP链接对于系统的性能有很大的影响,并且TCP的链接数也有限制,这也限制了系统处理高并发的能力。可是,在TCP链接中创建Channel是没有上述代价的。对于Producer或者Consumer来讲,能够并发的使用多个Channel进行Publish或者Receive。有实验代表,1s的数据能够Publish10K的数据包。固然对于不一样的硬件环境,不一样的数据包大小这个数据确定不同,可是我只想说明,对于普通的Consumer或者Producer来讲,这已经足够了。若是不够用,你考虑的应该是如何细化SPLIT你的设计。
Exchange的分发策略:
Exchange分发消息时根据类型的不一样分发策略有区别,目前共四种类型:direct、fanout、topic、headers。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器彻底一致,但性能差不少,目前几乎用不到。
a、概念
zeromq
是一个为可伸缩的分布式或并发应用程序设计的高性能异步消息库,提供一个消息队列, 可是与面向消息的中间件不一样,ZeroMQ的运行不须要专门的消息代理(message broker)该库设计成常见的套接字风格的API。zeromq
并非相似rabbitmq
消息列队,它实际上只一个消息列队组件,一个库。请求响应模式(Request-Reply): 用于未来自ZMQ_REQ客户端的请求发送到一个或多个ZMQ_REP服务,并接收对发送的每一个请求的后续回复,是一种远程过程调用和任务分发模式。客户端在请求后,服务端必须回响应
发布/订阅模式(Publish-Subscribe): 从单个发布者到多个订阅者的一对多数据分发。广播全部client,没有队列缓存,断开链接数据将永远丢失。client能够进行数据过滤。
管道模式(Parallel Pipeline)以 push/pull 模式链接节点,能够有多个步骤、循环。这是一种并行的任务分发和收集模式。由三部分组成,push进行数据推送,work进行数据缓存,pull进行数据竞争获取处理。区别于Publish-Subscribe存在一个数据缓存和处理负载。当链接被断开,数据不会丢失,重连后数据继续发送到对端。
b、代码实现
# server端: import zmq context = zmq.Context() socket = context.socket(zmq.REP) socket.bind("tcp://*:5555") while True: message = socket.recv() print("Received: %s" % message) socket.send("I am OK!") # client端: import zmq import sys context = zmq.Context() socket = context.socket(zmq.REQ) socket.connect("tcp://localhost:5555") socket.send('Are you OK?') response = socket.recv(); print("response: %s" % response) # 输出: $ python app/server.py Received: Are you OK? $ python app/client1.py response: I am OK!
# server端: import zmq import time context = zmq.Context() socket = context.socket(zmq.PUB) socket.bind("tcp://*:5555") while True: print('发送消息') socket.send("消息群发") time.sleep(1) # client端1: import zmq import sys context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect("tcp://localhost:5555") socket.setsockopt(zmq.SUBSCRIBE,'') # 消息过滤 while True: response = socket.recv(); print("response: %s" % response) #client端2: import zmq import sys context = zmq.Context() socket = context.socket(zmq.SUB) socket.connect("tcp://localhost:5555") socket.setsockopt(zmq.SUBSCRIBE,'') while True: response = socket.recv(); print("response: %s" % response) # 输出: $ python app/server.py 发送消息 发送消息 发送消息 $ python app/client2.py response: 消息群发 response: 消息群发 response: 消息群发 $ python app/client1.py response: 消息群发 response: 消息群发 response: 消息群发
# server端: import zmq import time context = zmq.Context() socket = context.socket(zmq.PUSH) socket.bind("tcp://*:5557") while True: socket.send("测试消息") print "已发送" time.sleep(1) # work端: import zmq context = zmq.Context() recive = context.socket(zmq.PULL) recive.connect('tcp://127.0.0.1:5557') sender = context.socket(zmq.PUSH) sender.connect('tcp://127.0.0.1:5558') while True: data = recive.recv() print "正在转发..." sender.send(data) # client端: import zmq import sys context = zmq.Context() socket = context.socket(zmq.PULL) socket.bind("tcp://*:5558") while True: response = socket.recv(); print("response: %s" % response) # 输出结果: $ python app/server.py 已发送 已发送 已发送 $ python app/work.py 正在转发... 正在转发... 正在转发... $ python app/client1.py response: 测试消息 response: 测试消息 response: 测试消息
特性 |
ActiveMQ |
RabbitMQ |
RocketMQ |
Kafka |
单机吞吐量 |
万级,吞吐量比RocketMQ和Kafka要低了一个数量级 |
万级,吞吐量比RocketMQ和Kafka要低了一个数量级 |
10万级,RocketMQ也是能够支撑高吞吐的一种MQ |
10万级别,这是kafka最大的优势,就是吞吐量高。
通常配合大数据类的系统来进行实时数据计算、日志采集等场景 |
topic数量对吞吐量的影响 |
|
|
topic能够达到几百,几千个的级别,吞吐量会有较小幅度的降低
这是RocketMQ的一大优点,在同等机器下,能够支撑大量的topic |
topic从几十个到几百个的时候,吞吐量会大幅度降低
因此在同等机器下,kafka尽可能保证topic数量不要过多。若是要支撑大规模topic,须要增长更多的机器资源 |
时效性 |
ms级 |
微秒级,这是rabbitmq的一大特色,延迟是最低的 |
ms级 |
延迟在ms级之内 |
可用性 |
高,基于主从架构实现高可用性 |
高,基于主从架构实现高可用性 |
很是高,分布式架构 |
很是高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会致使不可用 |
消息可靠性 |
有较低的几率丢失数据 |
|
通过参数优化配置,能够作到0丢失 |
通过参数优化配置,消息能够作到0丢失 |
功能支持 |
MQ领域的功能极其完备 |
基于erlang开发,因此并发能力很强,性能极其好,延时很低 |
MQ功能较为完善,仍是分布式的,扩展性好 |
功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准 |
优劣势总结 |
很是成熟,功能强大,在业内大量的公司以及项目中都有应用
偶尔会有较低几率丢失消息
并且如今社区以及国内应用都愈来愈少,官方社区如今对ActiveMQ 5.x维护愈来愈少,几个月才发布一个版本
并且确实主要是基于解耦和异步来用的,较少在大规模吞吐的场景中使用
|
erlang语言开发,性能极其好,延时很低;
吞吐量到万级,MQ功能比较完备
并且开源提供的管理界面很是棒,用起来很好用
社区相对比较活跃,几乎每月都发布几个版本分
在国内一些互联网公司近几年用rabbitmq也比较多一些
可是问题也是显而易见的,RabbitMQ确实吞吐量会低一些,这是由于他作的实现机制比较重。
并且erlang开发,国内有几个公司有实力作erlang源码级别的研究和定制?若是说你没这个实力的话,确实偶尔会有一些问题,你很难去看懂源码,你公司对这个东西的掌控很弱,基本职能依赖于开源社区的快速维护和修复bug。
并且rabbitmq集群动态扩展会很麻烦,不过这个我以为还好。其实主要是erlang语言自己带来的问题。很难读源码,很难定制和掌控。 |
接口简单易用,并且毕竟在阿里大规模应用过,有阿里品牌保障
日处理消息上百亿之多,能够作到大规模吞吐,性能也很是好,分布式扩展也很方便,社区维护还能够,可靠性和可用性都是ok的,还能够支撑大规模的topic数量,支持复杂MQ业务场景
并且一个很大的优点在于,阿里出品都是java系的,咱们能够本身阅读源码,定制本身公司的MQ,能够掌控
社区活跃度相对较为通常,不过也还能够,文档相对来讲简单一些,而后接口这块不是按照标准JMS规范走的有些系统要迁移须要修改大量代码
还有就是阿里出台的技术,你得作好这个技术万一被抛弃,社区黄掉的风险,那若是大家公司有技术实力我以为用RocketMQ挺好的 |
kafka的特色其实很明显,就是仅仅提供较少的核心功能,可是提供超高的吞吐量,ms级的延迟,极高的可用性以及可靠性,并且分布式能够任意扩展
同时kafka最好是支撑较少的topic数量便可,保证其超高吞吐量
并且kafka惟一的一点劣势是有可能消息重复消费,那么对数据准确性会形成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响能够忽略
这个特性自然适合大数据实时计算以及日志收集 |
RocketMQ,适用于可靠性要求高的场景,尤为是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能没法及时处理的状况。
RabbitMQ,erlang语言自己的并发优点,性能较好,社区活跃度也比较高,可是不利于作二次开发和维护,适用于数据量没有那么大的场景。
Kafka,是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务,有日志采集功能。
----------------------------------------------------
举个栗子:“P”是咱们的生产者,“C”是咱们的消费者。中间的框是一个队列 - RabbitMQ表明消费者保留的消息缓冲区。
生产者将消息发送到“hello”队列。使用者从该队列接收消息。
-------------------------------------------------------------------------------