解决的问题:java
总结:经过一个 MQ 的发布订阅消息模型(Pub/Sub), 系统 A 跟其余系统就完全解耦了。mysql
通常互联网类的企业,对用户的直接操做,通常要求每一个请求都必须在 200ms之内,对用户几乎是无感知的。redis
提升高延时接口sql
高峰期每秒 5000 个请求,每秒对 MySQL 执行 5000 条 SQL(通常MySQL每秒 2000 个请求差很少了),若是MySQL被打死,而后整个系统就崩溃,用户就没办法使用系统了。可是高峰期过了以后,每秒钟可能就 50 个请求,对整个系统没有任何压力。数据库
5000 个请求写入到 MQ 里面,系统 A 每秒钟最多只能处理 2000 个请求(MySQL 每秒钟最多处理 2000 个请求),系统 A 从 MQ 里慢慢拉取请求,每秒钟拉取 2000 个请求。MQ,每秒钟 5000 个请求进来,结果只有 2000 个请求出去,结果致使在高峰期(21小时),可能有几十万甚至几百万的请求积压在 MQ 中,这个是正常的,由于过了高峰期以后,每秒钟就 50 个请求,可是系统 A 仍是会按照每秒 2000 个该请求的速度去处理。只要高峰期一过,系统 A 就会快速的将积压的消息给解决掉。(算一笔帐,每秒积压在 MQ 里消息有 3000 条,一分钟就会积压 18W 条消息,一个小时就会积压 1000 万条消息。等高峰期一过,差很少须要 1 个多小时就能够把 1000W 条积压的消息给处理掉)segmentfault
MQ 可能挂掉,致使整个系统崩溃缓存
可能发重复消息,致使插入重复数据;消息丢了;消息顺序乱了; 系统 B,C,D 挂了,致使 MQ 消息积累,磁盘满了;性能优化
原本应该A,B,C,D 都执行成功了再返回,结果A,B,C 执行成功 D 失败网络
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
单机吞吐量 | 万级(一秒1W~2W 左右请求) | 万级 | 十万级 | 十万级 |
时效性 | ms级 | 微秒级,这个是rabbitmq一大特色,延迟最低的 | ms级 | ms级之内 |
可用性 | 高,基于主从架构高可用性 | 高,基于主从架构高可用性 | 很是高,分布式架构 | 很是高,Kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会致使不可用 |
消息可靠性 | 有较低几率丢失数据 | 通过参数优化配置,能够作到 0 丢失 | 通过参数优化配置,消息能够作到 0 丢失 | |
优劣势总结 | 优势:很是成熟,功能强大,在业内大量的公司和项目都有应用。缺点:偶尔会有较低几率丢失消息,而如今社区以及国内应用愈来愈少,官方社区对ActiveMQ 5.x维护愈来愈少,并且确实主要是基于解耦和异步来用,较少在大规模吞吐的场景中使用 | 优势:erlang语言开发,性能极其好,延时很低,管理界面很是棒,社区活跃 缺点:RabbitMQ确实吞吐量会低一些(单机几万),这个是由于他的实现机制比较重。并且 erlang 开发,国内有几个实力作 erlang源码级别的研究和定制?缺少掌控,依赖开源社区的维护和修复bug。并且 RabbitMQ集群动态扩展会很麻烦,其实主要是 erlang语言自己带来的问题,很难读源码,很难定制和掌控 | 优势:接口简单易用,阿里保障,日处理消息上百亿之多,能够作到大规模吞吐,性能也很是好,分布式扩展也很方便,社区维护还能够,可靠性和可用性都 OK,还能够支撑大规模的topic数量,支持复杂 MQ 业务场景,源码是 java,方便公司定制和掌控 缺点:社区活跃通常,接口不是按照标准的 JMS 规范走的,有些系统迁移须要修改大量代码,阿里出台的技术,有可能这个技术被抛弃。 | 优势:提供较少的核心功能,可是提升超高的吞吐量,ms级的延迟,极高的可用性以及可靠性,并且分布式能够任意扩展,Kafka最好是支撑较少的topic数量来保证极高的吞吐量。缺点:有可能消息重复消费,会对数据准确性形成影响,大数据领域中以及日志采集,这点影响能够忽略,自然适合大数据实时计算以及日志收集 |
建议:中小型公司 RabbitMQ 大公司:RocketMQ 大数据实时计算:Kafka多线程
RabbitMQ有三种模式:单机模式 、普通集群模式、镜像集群模式
demo级
队列的元数据存在于多个实例中,可是消息不存在多个实例中,每次
多台机器上启动多个 rabbitmq 实例,每一个机器启动一个。
优势:能够多个机器消费消息,可 以提升消费的吞吐量
缺点:可能会在 rabbitmq 内部产生大量的数据传输 ;可用性基本没保障,queue 所在机器宕机,就没办法消费了
没有高可用性可言
队列的元数据和消息都会存在于多个实例中,每次写消息到 queue的时候,都会自动把消息到多个实例的 queue 里进行消息同步。也就 是每一个节点上都有这个 queue 的一个完整镜像(这个 queue的所有数据)。任何一个节点宕机了,其余节点还包含这个 queue的完整数据,其余 consumer 均可以到其余活着的节点上去消费数据都是 OK 的。缺点:不是分布式的,若是这个 queue的数据量很大,大到这个机器上的容量没法容纳 。
开启镜像集群模式方法: 管理控制台,Admin页面下,新增一个镜像集群模式的策略,指定的时候能够要求数据同步到全部节点,也能够要求同步到指定数量的节点,而后你再次建立 queue 的时候 ,应用这个策略,就 会自动将数据同步到其余的节点上去。
broker进程就是kafka在每台机器上启动的本身的一个进程。每台机器+机器上的broker进程,就能够认为是 kafka集群中的一个节点。
你建立一个 topic,这个topic能够划分为多个 partition,每一个 partition 能够存在于不一样的 broker 上,每一个 partition就存放一部分数据。
这就是自然的分布式消息队列,也就是说一个 topic的数据,是分散放在 多个机器上的,每一个机器就放一部分数据。
分布式的真正含义是每一个节点只放一部分数据,而不是完整数据(完整数据就是HA、集群机制)
Kafka 0.8版本以前是没有 HA 机制的,任何一个 broker 宕机了,那么就缺失一部分数据。
Kafka 0.8之后,提供了 HA 机制,就是 replica 副本机制。每一个 partition的数据都会同步到其余机器上,造成本身的多个 replica 副本。而后全部 replica 会选举一个 leader。那么生产者、消费者都会和这个 leader 打交道,而后其余 replica 就是 follow。写的时候,leader 负责把数据同步到全部 follower上去,读的时候就直接读 leader 上的数据便可。若是某个 broker宕机了,恰好也是 partition的leader,那么此时会选举一个新的 leader出来,你们继续读写那个新的 leader便可,这个就 是所谓的高可用性。
leader和follower的同步机制:
写数据的时候,生产者就写 leader,而后 leader将数据落地写本地磁盘,接着其余 follower 本身主动从 leader来pull数据。一旦全部 follower同步好数据了,就会发送 ack给 leader,leader收到全部 follower的 ack以后,就会返回写成功的消息给生产者。
消费的时候,只会从 leader去读,可是只有一个消息已经被全部 follower都同步成功返回 ack的时候,这个消息才会被消费者读到。
MQ 只能保证消息不丢,不能保证重复发送
每条消息都有一个 offset 表明 了这个消息的顺序的序号,按照数据进入 kafka的顺序,kafka会给每条数据分配一个 offset,表明了这个是数据的序号,消费者从 kafka去消费的时候,按照这个顺序去消费,消费者会去提交 offset,就是告诉 kafka已经消费到 offset=153这条数据了 ;zk里面就记录了消费者当前消费到了 offset =几的那条消息;假如此时消费者系统被重启,重启以后,消费者会找kafka,让kafka把上次我消费到的那个地方后面的数据继续给我传递过来。
重复消息缘由:(主要发生在消费者重启后)
消费者不是说消费完一条数据就立马提交 offset的,而是定时按期提交一次 offset。消费者若是再准备提交 offset,可是还没提交 offset的时候,消费者进程重启了,那么此时已经消费过的消息的 offset并无提交,kafka也就不知道你已经消费了 offset= 153那条数据,这个时候kafka会给你发offset=152,153,154的数据,此时 offset = 152,153的消息重复消费了
幂等:一个数据或者一个请求,给你重复来屡次,你得确保对应的数据是不会改变的,不能出错。
思路:
MQ 传递很是核心的消息,好比:广告计费系统,用户点击一次广告,扣费一块钱,若是扣费的时候消息丢了,则会不断少钱,聚沙成塔,对公司是一个很大的损失。
问题 1解决方案:
事务机制:(通常不采用,同步的,生产者发送消息会同步阻塞卡住等待你是成功仍是失败。会致使生产者发送消息的吞吐量降下来)
channel.txSelect try { //发送消息 } catch(Exception e){ channel.txRollback; //再次重试发送这条消息 } channel.txCommit;
confirm机制:(通常采用这种机制,异步的模式,不会阻塞,吞吐量会比较高)
public void ack(String messageId){ } public void nack(String messageId){ //再次重发一次这个消息 }
问题 2 解决方案:
持久化到磁盘
缺点:可能会有一点点丢失数据的可能,消息恰好写到了 rabbitmq中,可是还没来得及持久化到磁盘上,结果不巧, rabbitmq挂了,会致使内存里的一点点数据会丢失。
问题 3 解决方案:
缘由:消费者打开了 autoAck机制(消费到一条消息,还在处理中,还没处理完,此时消费者自动 autoAck了,通知 rabbitmq说这条消息已经消费了,此时不巧,消费者系统宕机了,那条消息丢失了,还没处理完,并且 rabbitmq还觉得这个消息已经处理掉了)
解决方案:关闭 autoAck,本身处理完了一条消息后,再发送 ack给 rabbitmq,若是此时还没处理完就宕机了,此时rabbitmq没收到你发的ack消息,而后 rabbitmq 就会将这条消息从新分配给其余的消费者去处理。
缘由:消费者消费到那条消息后,自动提交了 offset,kafka觉得你已经消费好了这条消息,结果消费者挂了,这条消息就丢了。
例子:消费者消费到数据后写到一个内存 queue里缓存下,消息自动提交 offset,重启了系统,结果会致使内存 queue 里还没来得及处理的数据丢失。
解决方法:kafka会自动提交 offset,那么只要关闭自动提交 offset,在处理完以后本身手动提交,能够保证数据不会丢。可是此时确实仍是会重复消费,好比恰好处理完,还没提交 offset,结果本身挂了,此时确定会重复消费一次 ,作好幂等便可。
缘由:kafka 某个 broker 宕机,而后从新选举 partition 的 leader时,此时其余的 follower 恰好还有一些数据没有同步,结果此时 leader挂了,而后选举某个 follower成 leader以后,就丢掉了以前leader里未同步的数据。
例子:kafka的leader机器宕机,将 follower 切换为 leader以后,发现数据丢了
解决方案:(保证 kafka broker端在 leader发生故障,或者leader切换时,数据不会丢)
按 2 的方案设置了 ack =all,必定不会丢。它会要求 leader 接收到消息,全部的 follower 都同步 到了消息以后,才认为本次写成功。若是没知足这个条件,生产者会无限次重试 。
背景:mysql binlog 同步的系统,在mysql里增删改一条数据,对应出来了增删改 3 条binlog,接着这 3 条binlog发送到 MQ 里面,到消费出来依次执行,起码是要保证顺序的吧,否则顺序变成了 删除、修改、增长。日同步数据达到上亿,mysql->mysql,好比大数据 team,须要同步一个mysql库,来对公司的业务系统的数据作各类复杂的操做。
场景:
须要保证顺序的数据放到同一个queue里
写入一个 partition中的数据必定是有顺序的。
生产者在写的时候,能够指定一个 key,好比订单id做为key,那么订单相关的数据,必定会被分发到一个 partition中区,此时这个 partition中的数据必定是有顺序的。Kafka 中一个 partition 只能被一个消费者消费。消费者从partition中取出数据的时候 ,必定是有顺序的。
若是消费者单线程消费+处理,若是处理比较耗时,处理一条消息是几十ms,一秒钟只能处理几十条数据,这个吞吐量过低了。确定要用多线程去并发处理,压测消费者4 核 8G 单机,32 条线程,最高每秒能够处理上千条消息
消费端出了问题,不消费了或者消费极其慢。接着坑爹了,你的消息队列集群的磁盘都快写满了 ,都没人消费,怎么办?积压了几个小时,rabbitmq设置了消息过时时间后就没了,怎么办?
例如:每次消费以后都要写 mysql,结果mysql挂了,消费端 hang 不动了。
消费者本地依赖的一个东西挂了,致使消费者挂了。
长时间没处理消费,致使 mq 写满了。
场景:几千万条数据再 MQ 里积压了七八个小时
一个消费者一秒是 1000 条,一秒 3 个消费者是 3000 条,一分钟是 18W 条,1000 多 W 条须要一个小时恢复。
步骤:
原来 3 个消费者须要 1 个小时能够搞定,如今 30 个临时消费者须要 10 分钟就能够搞定。
若是用的 rabbitmq,而且设置了过时时间,若是此消费在 queue里积压超过必定的时间会被 rabbitmq清理掉,数据直接搞丢。
这个时候开始写程序,将丢失的那批 数据查出来,而后从新灌入mq里面,把白天丢的数据补回来。
若是消息积压mq,长时间没被处理掉,致使mq快写完满了,你临时写一个程序,接入数据来消费,写到一个临时的mq里,再让其余消费者慢慢消费 或者消费一个丢弃一个,都不要了,快速消费掉全部的消息,而后晚上补数据。
本文由博客一文多发平台 OpenWrite 发布!