本套技术专栏是做者(秦凯新)平时工做的总结和升华,经过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,若有任何学术交流,可随时联系。mysql
比较核心的有3个:解耦、异步、削峰。redis
rabbitmq有三种模式:单机模式,普通集群模式,镜像集群模式。sql
普通集群模式模式是在多台机器上启动多个rabbitmq实例,每一个机器启动一个。可是你建立的queue,只会放在一个rabbtimq实例上,可是每一个实例都同步queue的元数据。完了你消费的时候,实际上若是链接到了另一个实例,那么那个实例会从queue所在实例上拉取数据过来。数据库
镜像集群模式是,才是所谓的rabbitmq的高可用模式,跟普通集群模式不同的是,你建立的queue,不管元数据仍是queue里的消息都会存在于多个实例上,而后每次你写消息到queue的时候,都会自动把消息到多个实例的queue里进行消息同步。api
rabbitmq并非分布式消息队列,他就是传统的消息队列,只不过提供了一些集群、HA的机制而已。rabbitmq一个queue的数据都是放在一个节点里的,镜像集群下,也是每一个节点都放这个queue的完整数据。没法真正实现集群的扩容。多线程
kafka由多个broker组成,每一个broker是一个节点;你建立一个topic,这个topic能够划分为多个partition,每一个partition能够存在于不一样的broker上,每一个partition就放一部分数据。一个topic的数据,是分散放在多个机器上的,每一个机器就放一部分数据。架构
kafka 0.8之前,是没有HA机制的,就是任何一个broker宕机了,那个broker上的partition就废了,无法写也无法读,没有什么高可用性可言。并发
kafka 0.8之后,提供了HA机制,就是replica副本机制。每一个partition的数据都会同步到其余机器上,造成本身的多个replica副本。而后全部replica会选举一个leader出来,那么生产和消费都跟这个leader打交道,而后其余replica就是follower。异步
写的时候,leader会负责把数据同步到全部follower上去,读的时候就直接读leader上数据便可。只能读写leader?很简单,要是你能够随意读写每一个follower,那么就要care数据一致性的问题,系统复杂度过高,很容易出问题。kafka会均匀的将一个partition的全部replica分布在不一样的机器上,这样才能够提升容错性。分布式
消费的时候,只会从leader去读,可是只有一个消息已经被全部follower都同步成功返回ack的时候,这个消息才会被消费者读到。
既然是消费消息,那确定要考虑考虑会不会重复消费?能不能避免重复消费?或者重复消费了也别形成系统异常能够吗?这个是MQ领域的基本问题,其实本质上如何使用消息队列保证幂等性。
kafka实际上有个offset的概念,就是每一个消息写进去,都有一个offset,表明他的序号,而后consumer消费了数据以后,每隔一段时间,会把本身消费过的消息的offset提交一下,表明我已经消费过了,下次我要是重启啥的,你就让我继续从上次消费到的offset来继续消费吧。
可是凡事总有意外,好比咱们以前生产常常遇到的,就是你有时候重启系统,看你怎么重启了,若是碰到点着急的,直接kill进程了,再重启。这会致使consumer有些消息处理了,可是没来得及提交offset,尴尬了。重启以后,少数消息会再次消费一次。
好比你拿个数据要写库,你先根据主键查一下,若是这数据都有了,你就别插入了,update一下好吧
好比你是写redis,那没问题了,反正每次都是set,自然幂等性。
好比你不是上面两个场景,那作的稍微复杂一点,你须要让生产者发送每条数据的时候,里面加一个全局惟一的id,相似订单id之类的东西,而后你这里消费到了以后,先根据这个id去好比redis里查一下,以前消费过吗?若是没有消费过,你就处理,而后这个id写redis。若是消费过了,那你就别处理了,保证别重复处理相同的消息便可。
基于数据库的惟一键来保证重复数据不会重复插入多条,咱们以前线上系统就有这个问题,就是拿到数据的时候,每次重启可能会有重复,由于kafka消费者还没来得及提交offset,重复数据拿到了之后咱们插入的时候,由于有惟一键约束了,因此重复数据只会插入报错,不会致使数据库中出现脏数据。
基于rabbitmq提供的事务功能,
生产者发送数据以前开启rabbitmq事务(channel.txSelect),而后发送消息,若是消息没有成功被rabbitmq接收到,那么生产者会收到异常报错,此时就能够回滚事务(channel.txRollback),而后重试发送消息;若是收到了消息,那么能够提交事务(channel.txCommit)。可是问题是,rabbitmq事务机制一搞,基本上吞吐量会下来,由于太耗性能。
基于rabbitmq开启confirm模式
在生产者那里设置开启confirm模式以后,你每次写的消息都会分配一个惟一的id,而后若是写入了rabbitmq中,rabbitmq会给你回传一个ack消息,告诉你说这个消息ok了。若是rabbitmq没能处理这个消息,会回调你一个nack接口,告诉你这个消息接收失败,你能够重试。并且你能够结合这个机制本身在内存里维护每一个消息id的状态,若是超过必定时间还没接收到这个消息的回调,那么你能够重发。
事务机制和cnofirm机制不一样之处
事务机制和cnofirm机制最大的不一样在于,事务机制是同步的,你提交一个事务以后会阻塞在那儿,可是confirm机制是异步的,你发送个消息以后就能够发送下一个消息,而后那个消息rabbitmq接收了以后会异步回调你一个接口通知你这个消息接收到了。
消费端手动ACK机制实现
rabbitmq若是丢失了数据,主要是由于你消费的时候,刚消费到,还没处理,结果进程挂了,好比重启了,那么就尴尬了,rabbitmq认为你都消费了,这数据就丢了。
这个时候得用rabbitmq提供的ack机制,简单来讲,就是你关闭rabbitmq自动ack,能够经过一个api来调用就行,而后每次你本身代码里确保处理完的时候,再程序里ack一把。这样的话,若是你还没处理完,不就没有ack?那rabbitmq就认为你还没处理完,这个时候rabbitmq会把这个消费分配给别的consumer去处理,消息是不会丢的。
消费端手动ACK机制实现
惟一可能致使消费者弄丢数据的状况,就是说,你那个消费到了这个消息,而后消费者那边自动提交了offset,让kafka觉得你已经消费好了这个消息,其实你刚准备处理这个消息,你还没处理,你本身就挂了,此时这条消息就丢咯。
这不是同样么,你们都知道kafka会自动提交offset,那么只要关闭自动提交offset,在处理完以后本身手动提交offset,就能够保证数据不会丢。可是此时确实仍是会重复消费,好比你刚处理完,还没提交offset,结果本身挂了,此时确定会重复消费一次,本身保证幂等性就行了。
生产环境碰到的一个问题,就是说咱们的kafka消费者消费到了数据以后是写到一个内存的queue里先缓冲一下,结果有的时候,你刚把消息写入内存queue,而后消费者会自动提交offset。
而后此时咱们重启了系统,就会致使内存queue里还没来得及处理的数据就丢失了。
kafka broker 数据零丢失保证
好比kafka某个broker宕机,而后从新选举partiton的leader时。你们想一想,要是此时其余的follower恰好还有些数据没有同步,结果此时leader挂了,而后选举某个follower成leader以后,他不就少了一些数据?这就丢了一些数据啊。
生产环境也遇到过,咱们也是,以前kafka的leader机器宕机了,将follower切换为leader以后,就会发现说这个数据就丢了
因此此时通常是要求起码设置以下4个参数:
(1)topic设置replication.factor参数:这个值必须大于1,要求每一个partition必须有至少2个副本
(2)kafka服务端设置min.insync.replicas参数:这个值必须大于1,这个是要求一个leader至少感知到有至少一个follower还跟本身保持联系,没掉队,这样才能确保leader挂了还有一个follower吧
(3)producer端设置acks=all:这个是要求每条数据,必须是写入全部replica以后,才能认为是写成功了
(4)producer端设置retries=MAX(很大很大很大的一个值,无限次重试的意思):这个是要求一旦写入失败,就无限重试,卡在这里了
复制代码
咱们生产环境就是按照上述要求配置的,这样配置以后,至少在kafka broker端就能够保证在leader所在broker发生故障,进行leader切换时,数据不会丢失。
kafka分区partition挂掉以后如何恢复?
在kafka中有一个partition recovery机制用于恢复挂掉的partition。 每一个Partition会在磁盘记录一个RecoveryPoint(恢复点), 记录已经flush到磁盘的最大offset。当broker fail 重启时,会进行loadLogs。 首先会读取该Partition的RecoveryPoint,找到包含RecoveryPoint点上的segment及之后的segment, 这些segment就是可能没有彻底flush到磁盘segments。而后调用segment的recover,从新读取各个segment的msg,并重建索引。
优势:
以segment为单位管理Partition数据,方便数据生命周期的管理,删除过时数据简单
在程序崩溃重启时,加快recovery速度,只需恢复未彻底flush到磁盘的segment便可
什么缘由致使副本与leader不一样步的呢?
慢副本:在必定周期时间内follower不能追遇上leader。最多见的缘由之一是IO瓶颈致使follower追加复制消息速度慢于从leader拉取速度。
卡住副本:在必定周期时间内follower中止从leader拉取请求。follower replica卡住了是因为GC暂停或follower失效或死亡。
新启动副本:当用户给主题增长副本因子时,新的follower不在同步副本列表中,直到他们彻底遇上了leader日志。
一个partition的follower落后于leader足够多时,被认为不在同步副本列表或处于滞后状态。正如上述所说,如今kafka断定落后有两种,副本滞后判断依据是副本落后于leader最大消息数量(replica.lag.max.messages)或rep licas响应partition leader的最长等待时间(replica.lag.time.max.ms)。前者是用来检测缓慢的副本,然后者是用来检测失效或死亡的副本。
注意:新版本中,replica.lag.max.messages已经废弃。
例如:在mysql里增删改一条数据,对应出来了增删改3条binlog,接着这三条binlog发送到MQ里面,到消费出来依次执行,起码得保证人家是按照顺序来的吧?否则原本是:增长、修改、删除;你楞是换了顺序给执行成删除、修改、增长,不全错了么。
原本这个数据同步过来,应该最后这个数据被删除了;结果你搞错了这个顺序,最后这个数据保留下来了,数据同步就出错了。
(1)rabbitmq:拆分多个queue,每一个queue一个consumer,就是多一些queue而已,确实是麻烦点;或者就一个queue可是对应一个consumer,而后这个consumer内部用内存队列作排队,而后分发给底层不一样的worker来处理
(2)kafka:一个topic,一个partition,一个consumer,内部单线程消费,写N个内存queue,而后N个线程分别消费一个内存queue便可
可能你的消费端出了问题,不消费了,或者消费的极其极其慢。致使消息队列集群的磁盘都快写满了,都没人消费,这个时候怎么办?或者是整个这就积压了几个小时,你这个时候怎么办?或者是你积压的时间太长了,致使好比rabbitmq设置了消息过时时间后就没了怎么办?
因此若是你积压了几百万到上千万的数据,即便消费者恢复了,也须要大概1小时的时间才能恢复过来 通常这个时候,只能操做临时紧急扩容了,具体操做步骤和思路以下:
1)先修复consumer的问题,确保其恢复消费速度,而后将现有cnosumer都停掉
2)新建一个topic,partition是原来的10倍,临时创建好原先10倍或者20倍的queue数量
3)而后写一个临时的分发数据的consumer程序,这个程序部署上去消费积压的数据,消费以后不作耗时的处理,直接均匀轮询写入临时创建好的10倍数量的queue
4)接着临时征用10倍的机器来部署consumer,每一批consumer消费一个临时queue的数据
5)这种作法至关因而临时将queue资源和consumer资源扩大10倍,以正常的10倍速度来消费数据
6)等快速消费完积压数据以后,得恢复原先部署架构,从新用原先的consumer机器来消费消息
复制代码
过时问题解决思路
假设你用的是rabbitmq,rabbitmq是能够设置过时时间的,就是TTL,若是消息在queue中积压超过必定的时间就会被rabbitmq给清理掉,这个数据就没了。那这就是第二个坑了。这就不是说数据会大量积压在mq里,而是大量的数据会直接搞丢。
这个状况下,就不是说要增长consumer消费积压的消息,由于实际上没啥积压,而是丢了大量的消息。咱们能够采起一个方案,就是批量重导,这个咱们以前线上也有相似的场景干过。就是大量积压的时候,咱们当时就直接丢弃数据了,而后等过了高峰期之后,好比你们一块儿喝咖啡熬夜到晚上12点之后,用户都睡觉了。
这个时候咱们就开始写程序,将丢失的那批数据,写个临时程序,一点一点的查出来,而后从新灌入mq里面去,把白天丢的数据给他补回来。也只能是这样了。
假设1万个订单积压在mq里面,没有处理,其中1000个订单都丢了,你只能手动写程序把那1000个订单给查出来,手动发到mq里去再补一次
MQ磁盘爆满解决思路
若是走的方式是消息积压在mq里,那么若是你很长时间都没处理掉,此时致使mq都快写满了,咋办?这个还有别的办法吗?没有,谁让你第一个方案执行的太慢了,你临时写程序,接入数据来消费,消费一个丢弃一个,都不要了,快速消费掉全部的消息。而后走第二个方案,到了晚上再补数据。
mq支持可伸缩性。就是须要的时候快速扩容,就能够增长吞吐量和容量,那怎么搞?设计个分布式的系统呗,参照一下kafka的设计理念,broker -> topic -> partition,每一个partition放一个机器,就存一部分数据。若是如今资源不够了,简单啊,给topic增长partition,而后作数据迁移,增长机器,不就能够存放更多数据,提供更高的吞吐量了?
mq支持数据落地磁盘。落磁盘,才能保证别进程挂了数据就丢了。那落磁盘的时候怎么落啊?顺序写,这样就没有磁盘随机读写的寻址开销,磁盘顺序读写的性能是很高的,这就是kafka的思路。
mq支持可用性。采用多副本机制 -> leader & follower -> broker挂了,从新选举leader便可对外服务。
mq支持数据0丢失,参考以前说的那个kafka数据零丢失方案
结合大数据在咱们工业大数据平台的实践,总结成一篇实践指南,方便之后查阅反思,后续我会根据本篇博客进行代码技术实践实现。
秦凯新 于郑州 201903022307