《Apache Kafka 实战》读书笔记-认识Apache Kafka

                《Apache Kafka 实战》读书笔记-认识Apache Kafkahtml

                                           做者:尹正杰正则表达式

版权声明:原创做品,谢绝转载!不然将追究法律责任。windows

 

 

 

 

一.kafka概要设计
  kafka在设计初衷就是为了解决互联网公司的超级大量级数据的实时传输。为了实现这个目标,kafka在设计之初就须要考虑如下四个方面:
  第一:吞吐量/延迟
  第二:消息持久化 
  第三:负载均衡和故障转移
  第四:伸缩性
1>.吞吐量/延时介绍
  咱们先打个比方:若kafka处理一条消息须要花费2ms,那么计算获得的吞吐量不会超过500条消息每秒(1000ms/2ms=500条/s)。可是若咱们采用批处理(batching)的思想,假设在发送前咱们首先会等待一段时间(假设是8ms),那么此时消息发送的延迟变成了10ms(2ms+8ms),即延迟增长了4倍,但假设在这8ms中咱们总共积累了1000条消息,那么整个系统的吞吐量就变成了100000 条/s。此时你会发现吞吐量提高了200倍!看到micor-batch的威力了吧?这就是目前诸如Storm Trident 和 Spark Streaming等消息处理平台所采用的批处理思想。 
2>.Kafka如何作到高吞吐量,低延迟的呢? 
  首先,kafka的写入操做是很快的,这主要得益于它对磁盘的使用方法的不一样。虽然kafka会持久化全部数据到磁盘,但本质上每次写入操做其实都只是把数据写入到操做系统的页缓存(page cache)中,而后由操做系统自行决定何时把页缓存中的数据写入磁盘上。这样的设计由三个主要的优点:
  第一:操做系统页缓存是内存中分配的,因此消息写入的速度很是快;
  第二:kafka没必要直接与底层的文件系统打交道。因此烦琐的I/O操做都交由操做系统来处理;
  第三:kafka写入操做采用追加写入(append)方式,避免了磁盘随机写操做(据资料统计,顺序磁盘I/O速度是绝不逊色于随机读写内存I/O速度。感兴趣的小伙伴可使用相关工具测试一下。); 
3>.Kafka的高吞吐量,低延迟的设计目标
  第一:大量使用操做系统页缓存,内存操做速度快且命中率高; 
  第二:Kafka不直接参与物理I/O操做,而是交由最擅长此时的操做系统来完成; 
  第三:采用追加写入方式,摒弃了缓慢的磁盘随机读/写操做;
  第四:使用sendfile为表明的零拷贝技术增强网络间的数据传输效率; 
4>.消息持久化的优势
  第一:解耦消息发送和消息消费
    本质上来讲,kakfa最核心的功能就是提供了生产者-消费者模式的完整解决方案。经过将消息持久化使得生产者方再也不须要直接和消费者方耦合,它只是简单的把消息生产出来并交由kafka服务器保存便可,所以提高了总体的吞吐量。  
  第二:实现灵活的消息处理
    不少kafka的下游子系统(接受kafka消息的系统)都有这样的需求:对于已经处理过的消息可能在将来的某个时间点从新处理一次,即所谓的消息消息重演(message replay)。消息持久化即可以很方便地实现这样的需求。 
  第三:负载均衡和故障转移 
    做为一个功能完备的分布式系统,kafka若是只提供了最基本的消息引擎功能确定不足以帮助它脱颖而出。一套完整的消息引擎解决方案中韩必然要提供负载均衡(load balancing)和故障转移(fail-over)功能。
    何为负载均衡?顾名思义就是让系统的负载根据必定的规则均衡地分配在全部参数工做的服务器上,从而最大限度的提高总体的运行效率。kafka实现负载均衡其实是经过智能化的分区领导者选举(partition leader election)来实现的。 
    除了负载均衡,完备的分布式系统还支持故障转移,所谓故障转移,是指当服务器意外终止时,整个集群能够快速的检测到该失效(failure),并当即将该服务器上应用或服务自动转移到其余服务器上。故障转移一般是“心跳”和“会话“的机制来实现的。kafka服务器支持故障转移的方式就是使用绘画机制。每台kafka服务器启动后会以会话的形式把本身注册到zookeeper服务器上。一旦该服务运转出现问题,与zookeeper的会话变不能维持从而超时失效,此时kafka集群会选举出另一台服务器来彻底代替这台服务器继续提供服务。
  第四:伸缩性
    所谓伸缩性,英文名是scalability。伸缩性表示想分布式系统中增长额外的计算资源(好比CPU,内存,存储或带宽)时吞吐量提高的能力。阻碍线性扩容的一个很常见的因素就是状态的保存。咱们知道,不管是哪类分布式系统,集群的每台服务器必定会维护不少内部状态。若是由服务器本身来保存这些状态信息,则必须处理一致性的问题。相反,若是服务器是无状态的,状态的保存和管理交与专门的协调服务来作(好比zookeeper)。那么整个集群的服务武器之间就无需繁重的状态共享,者极大的下降了维护复杂度。假若要扩容集群节点,只须要简单的启动新的节点集群和进行自动负载均衡就能够了。 
  Kafka正式采用了这样的思想:每台kafka服务器上的状态统一交友zookeeper保管。扩展kafka集群也只须要一步:启动新的kafka服务器便可。固然这里须要言明的是,在kafka服务器上并非全部的状态信息都不保存,它只保存了很轻量级的内部状态(好比从kakka 0.10.x版本以后,它将每一个topic的消费者的偏移量本身维护了,把这些偏移量存放到了一个叫作“__consumer_offsets”的的topic进行维护)。
 
 
二.Kafka基本概念与术语
1>.Kafka的消息格式
  既然kafka的核心功能就是消息引擎,那么对于消息的设计天然是首当其冲的时期。kafka没有使人失望,其对消息格式的设计与保存的确有不少创新之处。首先,kakfa中的消息有不少字段组成,其中有的不少字段都是用于管理消息的原数据字段,对用户来讲是彻底透明的。kakfa的消息格式经历过3次变迁(咱们这次暂不考虑新出的kafka 2.0.1版本,由于我并无对这个版本作深刻的调研。)他们分别称为V0,V1和V2版本。目前大部分用户使用的应该仍是V1版本的消息格式。V1版本的消息格式以下图所示:
  如上图所示(上图摘自互联网),消息由消息头部,key和value组成。消息头部包括消息的CRC码,消息版本号,属性,时间戳,键长度,和消息长度等信息。其实对于普通用户来讲,掌握一下3个字段的含义通常就够用了:
    key :
      消息键,对消息作partition时使用,即决定消息被保存在某topic下的哪一个partition。
    value:
      消息体,保存实际的消息数据。
    timestamp:
      消息发送时间戳, 用于流式处理及其余依赖时间的处理语义。如宝不指定,则取当前时间。
  另外这里单独提一下消息的属性字段,kafka为该字段分配了一个字节,目前只使用了最低的3我为用于保存消息的压缩类型,其他5为还没有使用。当前支持4中压缩类型:0(无压缩),1(GZIP),2(Snappy)和3(LZ4)。关于kafka消息格式V0和V2版本,你们自行百度,推荐一片不错的文章:更多资料请参考:https://www.cnblogs.com/qwangxiao/p/9043491.html。
  其次,kafka使用紧凑的二进制字节数组来保存消息格式的字段,也就是说没有任何多余的比特位浪费。kafka在消息设计时特地避开了繁重的Java堆内存分配,直接使用紧凑二进制字节数组ByteBuffer而不是独立的对象,所以咱们至少可以访问多一倍的可用内存。按照Kafka官网的说法,在一台32GB内存的机器上,Kafka几乎可以用到28~30GB的物理内存,同时还没必要担忧GC的糟糕性能。若是使用ByteBuffer来保存一样的消息,只须要24字节,比起纯Java堆的实现减小了40%的空间占用,好处不言而喻。这种设计的好处还包括加入了扩展的可能性。
  同时,大量使用也缓存而非对内存还有一个好处:当出现Kafka broker进程崩溃时,堆内存的数据也一并小时,但页缓存的数据易燃存在。下载Kafka broker重启后能够继续提供服务,不须要再单独“热”缓存了。
2>.topic和partition
  在概念上来讲,topic只是一个逻辑概念,表明了一类消息,也能够认为是消息被发送到的地方。一般咱们可使用topic来区分实际业务,好比业务A使用一个topic,业务B使用另外一个topic。从本质上说,每一个Kafka topic都由若干个partition组成,而Kafka的partition是不可修改的有序消息序列,也就是说是有序的消息日志。每一个partition有本身专属的partition号,一般是从0开始的。用户堆partition我惟一能作的操做就是在消息序列的尾部追加写入消息。
  partition上的每条消息都会被分配一个惟一的序列号,按照Kafka的术语来说,该序列号被称为位移(offset)。该位移值是从0开始顺序递增的证书。位移信息能够惟必定义到某partition下的一条消息。值得一提的是,Kafka的partition实际上并无太多的业务含义,它的引入就是单纯的为了提高系统的吞吐量,所以在建立Kafka topic的时候能够根据集群实际配置设置具体的partition数,实现总体性能的最大化。
3>.offset
  上面说过,topic partition下的每条消息都被分配了一个位移值。实际上,Kafka消费者端也有位移(offset)的概念,但必定要注意这两个offset属于不一样的概念。
  显然,每条消息在某个partition的位移是固定的,但消费该partition的消费者的位移是会随着消费进度不断迁移,但终究不可能超过该分区最新一条消息的位移。综合以前说的topic,partition和offset,咱们能够断言Kafka中的一条消息其实就是一个<topic,partition,offset>三元组(tuple),经过该元组值咱们能够在Kafka集群中找到位移对应的那条消息。
4>.replia
  既然咱们已知partition是有序的消息日志,那么必定不能只保存者一份日志,不然一旦保存在partition的Kafka服务器挂掉了,其上保存的消息也就都丢失了。分布式系统必然要实现高可靠性,而目前实现的主要途径仍是依靠冗余机制。换句话说,就是备份多份日志。这些分贝日志在Kafka中被称为副本(replica),它们存在的惟一目的就是防止数据丢失,这一点必定要记住!
5>.leader和follower
  副本(replia)分为两类:领导者副本(leader replia)和追随者副本(follower replia)。follower replica是不能提供服务给客户端的,也就是说不负责响应客户端发来的消息写入和消息消费请求。它只是被动地向领导者副本(leader replia)获取数据,而一旦leader replica 所在的broker宕机,Kafka会从剩余的replica中选举出新的leader继续提供服务。
  Kafka保证同一个partition的多个replica必定不会分配在同一台broker上。毕竟若是同一个broker上有同一个partition的多个replica,那么将没法实现备份冗余的效果。
6>.ISR
  ISR的全称是in-sync replica,翻译过来就是与leader replica保持同步的replica集合。这是一个特别重要的概念。前面讲了不少关于Kafka的副本机制,好比一个partition能够配置N个replica,那么这是否就意味着该partition能够容忍N-1个replica实现而不丢失数据呢?答案是:“否”!

  副本数对Kafka的吞吐率是有必定的影响,但极大的加强了可用性。默认状况下Kafka的replica数量为1,即每一个partition都有一个惟一的leader,为了确保消息的可靠性,一般应用中将其值(由broker的参数offsets.topic.replication.factor指定)大小设置为大于1,好比3。 全部的副本(replicas)统称为Assigned Replicas,即AR。ISR是AR中的一个子集,由leader维护ISR列表,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度, 当前最新的版本0.10.x中只支持replica.lag.time.max.ms这个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。相反的,当这些replicas从新“追上”了leader的进度时,那么Kafka会将他们加回到ISR中。这一切都是自动维护的,不须要用户进行人为干预,于是在保证了消息交付语义的同时,还简化了用户的操做成本。后端

  更多学习笔记请参考:https://www.cnblogs.com/yinzhengjie/p/9652392.html。数组

 
三.Kafka的使用场景
  Kafka以消息引擎闻名,所以它特别适合处理生产环境中的那些流式数据。如下就是Kafka在实际应用中一些典型的使用场景。
1>.消息传输
  Kafka很是适合替代传统的消息总线(message bus)或消息代理(message broker)。传统的这类系统擅长于解耦生产者和消费者以及批量处理消息,而这些特色Kafka都具有。除此以外,Kafka还具备更好的吞吐量特性,其内置的分区机制和副本机制既实现了高性能的消息传输,同时还达到了高性能的高容错性。一次Kafka特别适合用于实现一个超大量级消息处理应用。
2>.网站行为日志追踪
   Kafka最先就是用于重建用户行为数据追踪系统的。不少网站上的用户操做都会以消息的形式发送到Kafka的某个对应的topic上。这些点击流蕴含了巨大的商业价值,事实上,目前就有不少创业公司使用机器学习或其余实时处理框架来帮助收集并分析用户的点击流数据。鉴于这种点击流数据量是很大的,Kafka超强的吞吐量特性此时就有了用武之地。
3>.审计数据收集
   不少企业和组织都须要对关键的操做和运维进行监控和审计。这就须要从各个方面运维应用程序处实时汇总操做步骤信息进行集中式管理。在这种使用场景下,你会发现Kafka是很是适合的解决方案,它能够便捷的对多路消息进行实时收集,同时因为其持久化的特性,是的后续离线审计称为可能。
4>.日志收集
   这多是Kafka最多见的使用方式了(日志收集汇总解决方案),每一个企业都会产生大量的服务日志,这些日志分散在不一样的机器上。咱们可使用Kafka对他们进行全量收集,并集中往下游的分布式存储中(好比HDFS等)。比起其余主流的日志抽取框架(好比Apache Flume),Kafka有更好的性能,并且提供了完备的可靠性解决方案,同时还保持 了低延迟的特色。
5>.Event Sourcing
   Event Sourcing其实是领域驱动设计(Domain-Driven Design,简称DDD)的名次,它使用事件序列来表示状态变动,这种思想和Kafka的设计特性不谋而合。还记得吧,Kafka也是用不可变动的消息序列来抽象化表示业务信息的,所以Kafka特别适合做为这种应用的后端存储。
6>.流式处理
   不少用户接触到Kafka都是由于它的消息存储引擎。自0.10.0.0版本开始,Kafka社区推出了一个全新的流式组件 Kafka Streams。这标志着Kafka正式进入流式处理框架俱乐部。相比老牌流式处理框架Apache Storm,Apache Samza,或是最近风头正劲的Spark Streaming,抑或是Apache Flink,Kafka Streams的竞争力如何?让咱们拭目以待吧!
 

 

四.集群环境规划 
1>.操做系统的选型
  咱们知道Kafka依赖于Java环境,所以咱们只要能在操做系统上安装jdk理论上就能够部署kafka环境了。没错,事实上kafka的确能够运行在主流的操做系统上,好比windows,Linux,mac OS等等。可是这么多操做系统咱们究竟应该选择哪一个操做系统去安装呢?为何你们部署kafka集群都选择的是Linux环境呢?其实我们是能够分析缘由的:
    第一:Kafka新版本clients在设计底层网络库时采用了Java的Selecor机制(NIO),然后者在Linux实现机制就是epoll;可是在window平台上,Java NIO的Selector底层是使用select模型而非IOCP实现的,只有Java NIO2擦拭使用IOCP实现的。所以这一点上,在Linux部署Kafka要在比Windows上部署可以获得高效的I/O处理能力;
    第二:对于数据网络传输效率而言,Linux也更具备优点。具体来讲,Kafka这种应用必然须要大量的经过网络于磁盘进行数据传输,而大部分这样的操做都是经过Java的FileChannel.transferTo方法实现的,在Linux平台上该方法底层会调用sendfile系统调用,即采用了Linux提供的零拷贝(Zero Copy)技术。
2>.磁盘规划
  事实上,根据公开的资料显示,LinkedIn公司的Kafka集群就是使用RAID 10做为底层存储的。除了默认提供的数据冗余以外,RAID 10 还能够将数据自动的负载分布到多个磁盘上。因而可知,RAID做为Kafka的底层存储其实主要的优点有两个:
    第一:提供冗余的数据存储空间;
    第二:自然提供负载均衡;
  以上两个优点对于任何系统而言都是很好的特性。不过对于Kafka而言,Kafka在框架层面其实已经提供了这两个特性:经过副本机制提供冗余和高可靠性,以及经过分散到各个节点的领导者选举机制来实现负载均衡,因此从这方面来看,RAID的优点就显得不是那么明显了。事实上,LinkedIn公司目前正在计划将整个Kafka集群从RAID 10 迁移到JBOD上,只不过在整个过程当中JBOD方案须要解决当前的Kafka一些固有缺陷,好比:
    第一:任意磁盘损坏都会致使broker宕机,普通磁盘损坏的几率是很大的,所以这个缺陷从某种程度上来讲是致命的。不过社区正在改进这个问题,将来版本中只要为broker配置的多块磁盘中还有良好的磁盘,broker就不会挂掉。
    第二:JBOD的管理须要更加细粒度化,目前Kafka没有提供脚本或其余工具用于在不一样磁盘间进行手动分配,但这是使用JBOD方案中必要的功能。
    第三:JBOD也应该提供相似于负载均衡的功能,目前只是间的依赖轮训的方式为副本数据选择磁盘,后续须要提供更加丰富的策略。
   结合JBOD和RAID之间的优劣对比以及LinkIn公司的实际案例,我们能够给硬盘规划的结论性总结以下:
    第一:追求性价比的公司能够考虑使用JBOD;
    第二:使用机械硬盘彻底能够知足Kafka集群的使用,SSD更好(尽可能不要使用NAS(Network Attached Storage)这样的网络存储设备。);
3>. 磁盘容量规划
   对于磁盘容量的规划和如下结果因素有关:
    第一:新增消息数;
    第二:消息留存时间;
    第四:平均消息大小;
    第五:副本数;
    第六:是否启用压缩;
4>.内存规划
  Kafka对于内存对使用可称做其设计亮点之一。虽然在前面咱们强调了Kafka大量依靠和磁盘来保存消息,但其实它还会对消息进行缓存,而这个消息换粗你得地方就是内存,具体来讲是操做系统对页缓存(page cache)。Kafka虽然会持久化每条消息,但其实这个工做都是底层对文件系统来完成。Kafka仅仅将消息写入page cache而已,以后将消息“flush”到磁盘对任务彻底交由操做系统来完成。
  通常状况下,broker所需的堆内存都不会超过6GB。因此对于一台16GB内存的机器而言,文件系统page cache的大小甚至能够达到10~14GB!总之对于内存规划的建议以下:
    第一:尽可能分配更多的内存给操做系统的page cache;
    第二:不要为broker设置过大的堆内存,最好不超过6GB;
    第三:page大小至少要大于一个日志段的大小;
5>.CPU规划
   比起磁盘和内存,CPU于kafka而言并无那么重要,严格来讲,kafka不属于计算密集型(CPU-bound)的系统,所以杜宇CPU须要记住一点就能够了:追求多核而非高时钟频率。我们对CPU资源规划以下:
    第一:使用多核系统,CPU核数最好大于8;
    第二:若是使用Kafka 0.10.0.0以前的版本或clients端消息版本不一致(若无显式配置,这种状况多半由clients和broker版本不一致形成),则考虑多配置一些资源以防止消息解压操做消耗过多CPU)。
6>.带宽规划
  第一:尽可能使用高速网络;
  第二:根据自身网络条件和带宽来评估Kafka集群机器数量;
  第三:避免使用跨机房网络;
7>.典型线上环境配置
  下面给出一份典型的线上环境配置,用户能够参考这份配置以及结合本身的是实际状况进行二次调整:
    CPU 24核心;
    内存 32GB;
    磁盘 1TB 7200转SAS盘2快;
    带宽:1Gb/s;
    ulimit -n 1000000;
    Socket Buffer 至少64KB,适合于跨机房网络传输;
 
五.reblance扫盲

1>.rebalance简介缓存

   consumer group的rebalance本质上是一组协议,它规定了一个consumer group 是如何达成一致来分配订阅topic的全部分区的。假设某个组下有20个consumer实例,该组订阅一个有着100个分区的topic。正常状况下,Kafka会为每一个consumer平均分配5个分区。这个分配过程就被称为rebalance。服务器

  当consumer成功执行rebalance后,组订阅topic的每一个分区只会分配给组内一个consumer实例。换句话说,同一个消费者组的消费者不能同时对同一个topic的同一个分区进行消费。网络

  和旧版本consumer依托于zookeeper进行rebalance不一样,新版本consumer使用了Kafka内置的一个全新的协议(group coordination protocol)。对于每一个组而言,Kafka的某个broker会被选举为组协调者(group coordinator)。coordinator负责对组对状态进行管理,他的主要责任就是当新成员到达时促成组内全部成员达成新对分区分配方案,即coordinator负责对组执行rebalance操做。app

2>.rebalance触发条件负载均衡

  组rebalance触发对条件有如下3个:

    第一:组成员发生变动,好比新consumer加入组,或已有consumer主动离开组,再或是已有consumer崩溃时则触发rebalance;

    第二:组订阅topic数发生变动,好比使用基于正则表达式对订阅,当匹配正则表达式对新topic被建立时则会触发rebalance;

    第三:组订阅topic时分区发生变动,好比使用命令行脚本增长了订阅topic的分区数;

  真实应用场景引起rebalance最多见的缘由就是违背了第一个条件(好比flume的kafka source相对于broker集群来讲就是consumer对象),特别是consumer崩溃的状况。这里的崩溃不必定就是指consumer进程“挂掉”或consumer进程所在的机器宕机。当consumer没法在指定的时间内完成消息处理,那么coordinator就认为该consumer已经崩溃,从而引起新一轮rebalance。

  我在生产环境中也使用flume消费kafka的数据到hdfs集群上,也遇到过这种rebalance的状况,最终分析缘由是:该group下的consumer处理消息的逻辑太重,并且事件处理时间波动很大,很是不稳定,从而致使coordinator会常常行的认为某个consumer已经挂掉,引起rebalance。鉴于目前一次rebalance操做的开销很大,生产环境中用户必定要结合自身业务特色仔细调优consumer参数:“request.timeout.ms”,“max.poll.records”和“max.poll.interval.ms”这几个参数,以免没必要要的rebalance出现。

3>.rebalance协议

  前面咱们提到过rebalance本质上是一组协议。group于coordinator共同使用这组协议完成group的rebalance。最新版本的Kafka中提供了下面5个协议来处理rebalance相关事宜。

    第一:JoinGroup请求:consumer请求加入组;

    第二:SyncGroup请求:group leader 把分配方案同步更新到组内全部成员中;

    第三:Heartbeat请求:consumer按期向coordinator汇报心跳代表本身依然存活;

    第四:LeaveGroup请求:consumer主动通知coordinator该consumer即将离组;

    第五:DescribeGroup请求:查看组的全部信息,包括成员信息,协议信息,分配方案以及订阅信息等。该请求类型主要供管理员使用。coordinator不使用该请求执行rebalance。

  在rebalance过程当中,coordinator主要处理consumer发过来的joinGroup和SyncGroup请求。当consumer主动离组时会发送LeaveGroup请求给coordinator。

  在成功rebalance过程当中,组内全部consumer都须要按期向coordinator发送Hearbeat请求。而每一个consumer也是根据Heartbeat请求的响应中是否包含REBALANCE_IN_PROGRESS来判断当前group是否开启来新一轮rebalance。

  好啦~关于rebalance我们了解到这里基本上就够用来,感兴趣的小伙伴能够查看rebalance genneration,rebalance流程,rebalance监听器等技术,咱们这里就不用深刻探讨啦~

相关文章
相关标签/搜索