消息队列常见问题分析

1、简介

好久之前也写过一篇关于消息队列的文章,这里的文章,这篇文章是对消息队列使用场景,以及一些模型作过一点介绍。html

这篇文章将分析消息队列常见问题。前端

消息队列:利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通讯来进行分布式系统集成。java

从定义看:它是一种数据交流平台,也是数据通讯平台。
然而,数据通讯咱们能够用http,RPC来进行通讯,这些与消息队列有什么区别呢?
最大的区别就是同步和异步。http和RPC通常都是同步,而消息队列是异步。git

2、为何要用消息队列

1.解耦
双方不在基于对方直接通讯了,而是基于消息队列来通讯,经过MQ解耦了客户端和服务端通讯。处理数据的双方关注的点不一样了,好比说一个事务,咱们只关心核心流程,而须要依赖其余系统但不是那么重要的事情,有通知便可,不须要等待结果。这种消息模型,关心的是通知,而不在乎处理过程。也能够用消息队列。
上下游开发人员也能够基于消息队列发送消息,而不须要同步的处理消息了。github

2.异步处理
传统的业务逻辑都是基于同步的方式进行处理的。而有了消息队列,就能够把消息存放在MQ里,消息队列的消费者就能够从消息队列中获取数据并进行处理。它不必定要实时处理,能够隔几分钟处理消息队列里的数据。web

3.削峰和流控
这里有点像计算机中的硬件,好比CPU和内存,CPU运算速度比内存高N个数量级,那怎么才能缓解二者之间的差别?中间加一个缓存来缓解二者速度的差别。
同理,MQ也能够起到这种做用。对于上下游软件不一样的处理速度的差别进行调节。数据库

好比,咱们常见的秒杀应用,前端瞬间涌入成千上万的请求,前端能够承受这么大的请求压力,可是复杂的后端系统,确定会被压垮,从而致使秒杀服务不能够用的状况。为了解决这种先后端处理速度不平衡的差别,致使的服务问题,能够引入消息队列来调节,用消息队列来缓存用户的请求,等待后端系统来消费。apache

上面就是消息队列的主要功能,固然还有其余一些功能,好比消息广播,最终一致性等。后端

使用MQ后的问题

固然使用了消息队列,会增长系统的复杂性,一致性延迟,可用性下降等问题。
可用性下降是指系统可用性下降,若是MQ挂了,那么确定会影响到整个系统了。
由于上下游系统可能都会与MQ交互。缓存

3、何时引入MQ?

这个要看业务系统功能需求,一个是系统处理是否到达了瓶颈,须要消息队列来缓解;
还有,业务系统一致性要求是否是特别高。一般业务系统不会要求那么高的一致性要求。固然一些高频交易系统,一致性要求特别高,就不适合用了。

引入任何一个新的软件必然会增长原有系统的复杂性,仍是要根据业务特性进行合理的选择。

4、消息队列常见问题

1.如何保证消息不被重复消费(怎么保证幂等)

为何会重复消费

  • 生产者:也就是客户端,可能会重复推送一条数据到MQ中。有多是客户端超时重复推送,也有多是网络比较慢客户端重复推送了数据到MQ中。
  • MQ:消费者消费完了一条数据,发送ACK信息表示消费成功时,这时候,MQ忽然挂了,致使MQ觉得消费者还未消费该条消息,MQ恢复后再次推送了该条消息,致使重复消费。
  • 消费者:与上面MQ挂掉状况相似,消费者已经消费完了一条消息,正准备给MQ发送ACK消息但还未发送时,这时候消费者挂了,服务重启后MQ觉得消费者尚未消费该条消息,再次推送该条消息。

怎么处理重复消费

每一个消息都带一个惟一的消息id。消费端保证不重复消费就能够了,即便生产端产生了重复的数据,固然生产端也最好控制下重复数据。

消费端保证不重复消费:
一般方法都是存储消费了的消息,而后判断消息是否存在。

1.先保存在查询
每次保存数据前,先查询下,不存在就插入。这种是并发不高的状况下可使用。

2.数据库添加惟一约束条件
好比惟一索引

3.增长一个消息表
已经消费的消息,把消息id插入到消息表里面。
为了保证高并发,消息表能够用Redis来存。

2.如何处理消息丢失的问题

消息丢失的缘由

  • 生产者:生产者推送消息到MQ中,可是网络出现了故障,好比网络超时,网络抖动,致使消息没有推送到MQ中,在网络中丢失了。又或者推送到MQ中了,可是这时候MQ内部出错致使消息丢失。

  • MQ:MQ本身内部发生了错误,致使消息丢失。

  • 消费者:有时处理消息的消费者处理不当,还没等消息处理完,就给MQ发送确认信息,可是这时候消费者自身出问题,挂了,确认消息已经发送给MQ告诉MQ本身已经消费完了,致使消息丢失。

如何保证消息不丢失呢? 下面谈谈这方面的作法。

3.如何保证消息可靠性传输

整个消息从生产到消费通常分为三个阶段:生产者-生产阶段,MQ-存储阶段,消费者-消费阶段

3.1 生产者-生产阶段
在这个阶段,通常经过请求确认机制,来保证消息可靠性传输。 与TCP/IP协议里ACK机制有点像。
客户端发送消息到消息队列,消息队列给客户端一个确认响应,表示消息已经收到,客户端收到响应,表示一次正常消息发送完毕。

3.2 MQ-存储阶段
消息队列给客户端发送确认消息。存储完成后,才发送确认消息。

3.3 消费者-消费阶段
跟生产阶段相同,消费完了,给消息队列发送确认消息。

4.如何保证消息的顺序性

咱们平常说的顺序性是什么呢?

好比说小孩早上上学过程,他先起床,而后洗漱,吃早餐,最后上学。咱们认为他作的事情是有前后顺序的,及是时间的前后顺序,咱们用时间来标记他的顺序。
更抽象的理解,这些发生的事件有一个相同的参考系,即他们的时间是对应同一个物理时钟的时间。

若是没有绝对的时间做为参考系,那他们之间还能肯定顺序吗?
若是事件之间有因果关系,好比A、B两个事件是因果关系,那么A必定发生在B以前(前应后果)。相反,在没有一个绝对的时间的参考的状况下,若A、B之间没有因果关系,那么A、B之间就没有顺序关系。跟java里的happen before很像。

总结一下,咱们说顺序时,其实说的是

  • 在有绝对时间做为参考系的状况下,事件发生的时间前后关系;
  • 在没有绝对时间做为参考系的状况下,一种由因果关系推断出来的happening before的关系;

在分布式系统领域,有一篇关于时间,时钟和事件的顺序的颇有名的一篇论文
Time, Clocks, and the Ordering of Events in a Distributed System
,能够看一看,上面举例状况都是参考这篇论文。

参考上面的结论,在消息队列中,咱们也是以时间做为参考系,让消息有序。

可是,在消息队列中,消息有序会遇到一些问题,下面让咱们来讨论这些问题。

消息的顺序性的一些问题

在计算机系统中,有一个比较棘手的问题是,它能够是多线程执行的,并且哪一个线程先运行,哪一个线程后运行,彻底是由操做系统决定的,彻底没有规律,是乱序执行。显然与消息队列中的消息有序相悖。

还有,在消息队列中,涉及到生产者,MQ,消费者,还有网络,这4者之间的关系。而后他们又涉及到消息的顺序性,就有不少种状况须要考虑。能够参考这篇文章
分布式开放消息系统(RocketMQ)的原理与实践
(做者:CHUAN.CHEN),各类状况讨论的很全面。

最后的结论就是:消息的顺序性,不只仅是MQ自己存储消息要保证顺序性,还须要生产者和消费者一同来保证顺序性。

顺序性保证

在消息队列中,消息的顺序性须要3方面来保证:
一、生产者发送消息时要保证顺序
二、消息被消息队列存储时要保持和发送的顺序一致
三、消息被消费时保持和存储的顺序一致

生产者:发送时要求用户在同一个线程中采用同步的方式发送。
消息队列:存储保持和发送的顺序一致。通常是在一个分区中保持顺序性。
消费者:一个分区的消息由一个线程来处理消费消息。

https://www.hicsc.com/post/2020041566 这个连接中,做者分析了RocketMQ顺序消息的代码实现。

5.消息队列中消息延迟问题

你说的 消息的延迟 是延迟消息队列吗? 啊,并非,是彻底2个不一样的概念。延迟消息队列是MQ提供的一个功能。消息的延迟,是指消费端消费的速度跟不上生产端产生消息的速度,可能致使消费端丢失数据,也可能致使消息积压在MQ中。因此这里说的消息的延迟,指的是消费端消费消息的延迟。

消息队列的消费模型pull和push:

一、push模式

这种模式是消息队列主动将消息推送给消费者。

  • 优势:尽量实时的将消息发送给消费者进行消费。
  • 缺点:若是消费端消费能力弱,消费端的消费速度赶不上生产端,而MQ又不断的给消费端推送消息,消费端的缓存满了致使缓存溢出,就会产生错误或丢失数据的可能。
二、pull模式

这种模式是由消费端主动向消息队列拉取消息。

  • 优势:能够自主可控的拉取消息。
  • 缺点:拉取消息的频率很差控制。

a、若是每次pull时间间隔比较久,会增长消息延迟,消息到达消费者时间会加长。这样时间一长会致使MQ中消息的堆积,而消息长时间堆积就会致使一系列的问题:

  • 一、若是积压了几个小时的数据,有几千万的数据量,消费端处理的压力会愈来愈大。
  • 二、若是是带有过时时间的消息,可能这些消息已经到了过时时间,由于积压时间太长,但还没被消费端消费掉,消费端来不及消费。
  • 三、若是持续的积压,达到了MQ能存储消息数量的上限,也就是说MQ满了,存不下了,会致使MQ丢掉数据,致使数据丢失。
    想一下,上面的情形是否是跟TCP/IP协议的流量控制和拥塞控制遇到的一些问题很像,也有不少不一样。

b、若是每次pull的时间间隔比较短,在一段时间内MQ中没有可消费的消息,会产生不少无效的pull请求,致使必定的网络开销。

因此解决问题的办法最主要就是优化消费端的消费性能。1.优化消费逻辑 2.水平扩容,增长消费端并发。

延迟问题处理

若是消息堆积已经发生了,致使了上面的3个问题,这时怎么办?
一、积压了几个小时几千万的数据
第一:确定要找到积压数据的缘由,通常都是消费端的问题。
第二:若是能够的,扩大消费端的数量,快速消费掉消息。
第三:扩容,增长多机器消费。新建一个topic,partition是原来10倍,创建原先10倍的queue。而后写一个临时的消费程序,这个消费程序去转移积压的数据,把积压的数据均匀轮询写入创建好的10倍数量的queue。而后在征用10倍机器的消费端来消费这个queue。这种作法至关于临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。消费完了,恢复原来的部署。这是大厂作法。

二、积压时间过长,带有过时时间的消息过时失效了
这个没有好的办法处理,只能经过程序找出丢失的数据,而后也是经过程序把丢失的数据从新导入到MQ里,从新消费。

三、长时间积压却是MQ写满了
这个也没啥好办法处理,只能快速消费掉MQ里的数据,快速消费指消费一个,丢掉一个,不要这些数据了,而后从新导入数据。用户少的时候在补回数据。

6.消息队列高可用

6.1 kafka

kafka基本架构:

  • Broker:一个kafka节点就是一个broker,多个broker组成一个kafka集群。一个broker能够是一个单机器kafka服务器。
  • Topic:存放消息的主题,至关于一个队列。能够理解为存放消息的分类,好比你能够有前端日志的Topic,后端日志的Topic。能够理解为MySQL里的表。
  • Partition:一个topic能够划分为多个partition,每一个partition都是一个有序队列。把topic主题中的消息进行分拆,均摊到kafka集群中不一样机器上。partition是topic的进一步拆分。
  • Replica:副本消息。kafka能够以partition为单位,保存多个副本,分散在不一样的broker上。副本数是能够设置的。
  • Segment: 一个Partition被切分为多个Segment,每一个Segment包含索引文件和数据文件。
  • Message:kafka里最基本消息单元。

一个kafka集群能够由多个broker组成,每一个broker是一个节点,你建立一个topic,这个topic能够划分为多个partition,每一个partition能够存储在不一样的broker上,每一个partition存放一部分数据。

6.2 RocketMQ

在 RocketMQ 4.5 版本以前,RocketMQ 只有 Master/Slave 一种部署方式来实现高可用。
一组 Broker 中有一个 Master,有零到多个 Slave,Slave 经过同步复制或异步复制方式去同步 Master 的数据。Master/Slave 部署模式,提供了必定的高可用性。

上面主从高可用架构有一个缺点:
主节点挂了后须要人为的进行重启或者切换。为了解决这个问题,后续引入了raft,用raft协议来完成自动选主。RocketMQ的DLedger 就是一个基于 raft 协议的 commitlog 存储库,也是 RocketMQ 实现新的高可用多副本架构的关键。

还能够多master多slave部署,防止单点故障。

5、参考

相关文章
相关标签/搜索