Apache Kafka框架学习

背景介绍java

消息队列的比较算法

kafka框架介绍数据库

  术语解释缓存

  文件存储服务器

  可靠性保证session

  高吞吐量实现负载均衡

  负载均衡框架

应用场景dom

 

背景介绍:异步

  kafka是由Apache软件基金会维护的一个开源流处理平台,由scala和java编写。最先开发自LinkedIn,用作LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。如今它已被多家不一样类型的公司做为多种类型的数据管道和消息系统使用。

  kafka是一种分布式的,基于发布/订阅的消息系统。

  kafka特色:

    快速持久化,能够在o(1)的系统开销下进行消息持久化;

    高吞吐,在一台普通的服务器上既能够达到10W/s的吞吐速率;

    彻底的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;

    kafka经过Hadoop的并行加载机制统一了在线和离线的消息处理。

    Apache Kafka相对于ActiveMQ是一个很是轻量级的消息系统,除了性能很是好以外,仍是一个工做良好的分布式系统。

消息队列:

使用消息队列的好处:

   解耦、扩展性、灵活性&峰值处理能力、可恢复性、顺序保证、缓冲......

RabbitMQ

  RabbitMQ是使用Erlang编写的一个开源的消息队列,自己支持不少的协议:AMQP,XMPP,SMTP,STOMP,也正因如此,它很是重量级。

Redis

  Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它自己支持MQ功能。对于RabbitMQ和Redis的入队和出队操做,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不一样大小的数据。实验代表:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而若是数据大小超过了10K,Redis则慢的没法忍受;出队时,不管数据大小,Redis都表现出很是好的性能,而RabbitMQ的出队性能则远低于Redis。

ZeroMQ

  ZeroMQ号称最快的消息队列系统,尤为针对大吞吐量的需求场景。ZMQ可以实现RabbitMQ不擅长的高级/复杂的队列,可是开发人员须要本身组合多种技术框架,技术上的复杂度是对这MQ可以应用成功的挑战。ZeroMQ具备一个独特的非中间件的模式,你只须要简单的引用ZeroMQ程序库,就能够愉快的在应用程序之间发送消息了。可是ZeroMQ仅提供非持久性的队列,也就是说若是宕机,数据将会丢失。

ActiveMQ

  ActiveMQ是Apache下的一个子项目。相似于ZeroMQ,它可以以代理人和点对点的技术实现队列。同时相似于RabbitMQ,它以少许代码就能够高效的实现高级应用场景。

kafka框架:

  Broker:kafka集群包含一个或多个服务器,每一个服务器被称为一个broker。

  Producer:负责发布消息到Kafka broker。

  Consumer:消息消费者,从Kafka broker读取消息的客户端。

  Consumer Group:每一个Consumer属于一个特定的Consumer Group(可为每一个Consumer指定group name,若不指定group name则属于默认的group)。当多个Consumer属于同一个Group时,它们所订阅的消息只会发布到该组的producer;当须要每一个Consumer都接受到消息时,能够赋予不一样的id。

  Topic:每条发布到Kafka集群的消息都有一个类别。这个类别被称为Topic。(物理上不一样Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic便可生产或消费数据而没必要关心数据存于何处)。

  Partition:Partition是物理上的概念,每一个Topic包含一个或多个Partition;建议每一个Topic的Partition数量不超过集群中的broker数量。

  Replica:kafka从0.8版本开始引入了副本机制,目的是为了增长Kafka的高可用性。每一个Partition会有多个副本,而且从副本集合中(Assigned Replic,AR)中选取一个副本做为leader副本,全部的读写请求都由leader副本处理。剩余的副本做为Follower副本,Follower副本从leader副本获取消息并更新至本身的Log中。若是leader副本所在的Broker出现故障,会从Follower副本选择一个做为Leader提供服务,保证Kafka的高可用性。

  Topic&Partition&Replica示意图

 

  Topic&Partition&Replica分配算法:

  1.将全部存活的N个Brokers和待分配的Partition排序;

  2.将第i个Partition分配到第(i mod n)个Broker上;而且会做为Partition的优先副本(这里就基本说明了一个topic的leader partition在集群上的大体分布状况);

  3.将第i个Partition的第j个Replica分配到第((i + j)mod n)个Broker上。

  假设集群一共有4个brokers,一个topic有4个partition,每一个Partition有3个副本,下图是每一个Broker上的副本分配状况。

  

Kafka框架-文件存储机制:

  kafka中的Message是以topic为基本单位组织的,不一样的topic之间是相互独立的。每一个topic又能够分红几个不一样的partition(每一个topic有几个partition是在建立topic时指定的),每一个partition存储一部分Message。

  Topic&partition&Message关系图:

 

  partition是以文件的形式存储在文件系统中,好比,建立了一个名为page_visits的topic,其有5个partition,那么在Kafka的数据目录中(由配置文件中的log.dirs指定的)中就有这样5个目录:page_visits-0, page_visits-1,page_visits-2,page_visits-3,page_visits-4,其命名规则为:<topic_name>-<partition_id>,里面存储的分别就是这5个partition的数据。

Partition的数据文件:

  Partition中的每条Message由offset来表示它在这个partition中的偏移量,不是该Message在partition数据文件中的实际存储位置,而是逻辑上的一个值,它惟一肯定了partition中的一条Message。所以,能够认为offset是partition中Message的id。partition中的每条Message包含了如下三个属性:offset、messageSize、data。其中offset为long型,MessageSize为int32,表示data有多大,data为message的具体内容。

   如上描述,若是每一个Partition对应一个存储文件,当一个Partition上存储大量消息时,追加消息的复杂度为o(1);查找一个消息时,须要遍历整个文件,复杂度o(n)。

Kafka解决方案:

  分段:

    好比有100条Message,它们的offset是从0到99.假设将数据文件分红5段,第一段为0-19,第二段为20-39,以此类推,每段放在一个单独的数据文件里面,数据文件以该段中最小的offset命名。这样在查找指定offset的Message的时候,用二分查找就能够定位到该Message在哪一个段中。

  

  索引:

    Kakfa为每一个分段后的数据文件创建了索引文件,文件名与数据文件的名字是同样的。只是文件扩展名为.index。index文件中并无为数据文件中的每条Message创建索引,而是采用了稀疏存储的方式,每隔必定字节的数据创建一条索引。这样避免了索引文件占用过多的空间,从而能够将索引文件保留在内存中。

  消息查找过程:

    好比:要查找绝对offset为7的Message:

      1.用二分查找肯定它是在那个LogSegment中:在第一个Segment中。

      2.打开这个Segment的index文件,也是用二分查找找到offset小于或者等于指定offset的索引条目中最大的那个offset。天然offset为6的那个索引是咱们要找的,经过索引文件咱们知道offset为6的Message在数据文件中位置为9087.

      3.打开数据文件,从位置为9807的那个地方开始顺序扫描直到找到offset为7的那条Message。

      这套机制是创建在offset是有序的。索引文件居中,因此查找的速度仍是挺快的。一句话,kafka的Message存储采用了分区(partition),分段(LogSegment)映射到内存和稀疏索引这几个手段来达到了高效性。

Kafka框架-数据可靠性保证:

Broker分析:

  对于broker,落盘的数据,除非磁盘坏了,通常不会丢的。

  对于内存脏(没有flush磁盘)数据,broker重启会丢,能够经过log.flush.interval.messages和log.flush.interval.ms来配置flush间隔,interval大丢的数据多些。

  Replica机制:是否使用replica取决于在可靠性和资源代价之间的平衡。

Consumer从Broker拉取消息:

  Kafka中有两种consumer接口,分别为Low-level API和High-level API

  (1).Low-level API SimpleConsumer,这套接口比较复杂的,使用者必需要考虑不少事情,优势就是对Kafka能够有彻底的控制。

  (2).High-level API 使用比较简单,已经封装了partition和offset的管理,默认是会按期自动commit offset,这样可能会丢数据,由于consumer可能拿到数据没有处理完crash。High-level API接口的特色,自动管理,使用简单,可是对Kafka的控制不够灵活。

  一种很是经常使用的选举leader的方式是“majority vote”(“少数服从多数”),但Kafka并未采用这种方式。这种模式下,若是咱们有2f+1个replica(包含leader和follower),那在commit以前必须保证有f+1个replica复制完消息,为了确保正确选出新的leader,fail的replica不能超过f个。由于在剩下的任意f+1个replica里,至少有一个replica包含有最新的全部消息。这种方式有个很大的优点,系统的latency只取决于最快的几台sever,也就是说,若是replication factor是3,那latency就取决于最快的那个follower而非最慢那个。majority vote也有一些劣势,为了保证leader election的正常进行,它所能容忍的fail的follower个数比较少。若是要容忍1个follower挂掉,必需要有3个以上的replica,若是要容忍2个follower挂掉,必需要有5个以上的replica。也就是说,在生产环境下为了保证较高的容错程度,必需要有大量的replica,而大量的replica又会在大数据量下致使性能急剧降低。这就是这种算法更多用在Zookeeper这种共享集群配置的系统中而不多在须要存储大量数据的系统中使用的缘由。例如HDFS的HA feature是基于majority-vote-based journal,可是它的数据存储并无使用这种expensive的方式。

如何肯定一个Broker是否还活着?

  1.它必须维护与Zookeeper的session(这个经过Zookeeper的Heartbeat机制来实现)。

  2.Follower必须可以及时将Leader的消息复制过来,不能“落后太多”。

  Leader会跟踪与其保持同步的Replica列表,该列表称为ISR(即in-sync Replica)。若是一个Follower宕机,或者落后太多,Leader将把它从ISR中移除。这里所描述的“落后太多“指Follower复制的消息落后于Leader后的条数超过预约值(该值可在$KAFKA_HOME/config/server.properties中经过replica.lag.max.messages配置,其默认值是4000)或者Follower超过必定时间(该值可在$KAFKA_HOME/config/server.properties中经过replica.lag.time.max.ms来配置,其默认值是10000)未向Leader发送fetch请求。

  这里的复制机制既不是同步复制,也不是单纯的异步复制。事实上,同步复制要求”活着的”follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率(高吞吐率是Kafka很是重要的一个特性)。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种状况下若是follower都落后于leader,而leader忽然宕机,则会丢失数据。而Kafka的这种使用“in sync” list的方式则很好的均衡了确保数据不丢失以及吞吐率。follower能够批量的从leader复制数据,这样极大的提升复制性能(批量写磁盘),极大减小了follower与leader的差距(前文又说到,只要follower落后leader不太远,则被认为在“in sync” list里)。

接受数据的可靠性保证:

  当producer向leader发送数据时,request.required.acks参数来设置可靠性的级别:

  1(默认):producer在ISR中的leader已成功收到的数据并获得确认后发送下一条message。若是leader宕机了,则会丢失数据。

  0:producer无需等待来自broker的确认而继续发送下一批消息。这种状况下数据传输效率最高,可是数据可靠性倒是最低的。

  -1:producer须要等待ISR中的全部follower都确认接受到数据后才算一次发送完成,可靠性最高。可是这样也不能保证数据不丢失,好比当ISR中只有leader时(前面ISR那一节讲到,ISR中的成员因为某些状况会增长也会减小,最少就剩一个leader),这样就变成了acks=1的状况。

接受数据可靠性保证:

  若是要提升数据的可靠性,在设置request.required.acks=-1的同时,也要min.insync.replicas这个参数(能够在broker或者topic层面进行设置)的配合,这样才能发挥最大的功效。

  min.insync.replicas这个参数设定ISR中的最小副本数是多少,默认为1,当且仅当request.required.acks参数设置为-1时,此参数才生效。若是ISR中的副本数少于min.insync.replicas配置的数量时,客户端会返回异常。

  request.required.acks=-1,同步(Kafka默认为同步,即producer.type=sync)的发送模式。

  replication.factor>=2且min.insvnc.replicas>=2的状况下,不会丢失数据。注:Kafka只处理fail/recover问题,不处理Byzantine问题。

     

  图4中若是选举后一个为leader,则前一个partition的HW不会更新,新消息继续从offset为5的地方存储;此时,producer没有收到ack消息,会继续发送消息四、5,此时产生重复;kafka不解决,有用户本身结局,好比在消息中添加全局Key。

Kafka框架-高吞吐量:

  顺序读/写文件、批量消息传递、数据压缩、Kakfa的消息存储在OS pagecache(页缓存,pagecache的大小为一页,一般为4K,在Linux读写文件时,它用于缓存文件的逻辑内容,从而加快对磁盘上影像和数据的访问)、Topic分为多个Partition,多个Parttition同时提供服务。

Kafka框架-负载均衡:

  producer根据用户指定的算法,将消息发送到指定的partition;

private[kafka] class DefaultPartitioner[T] extends Partitioner[T] {
  private val random = new java.util.Random
  def partition(key: T, numPartitions: Int): Int = {
    if(key == null)
    {
        println("key is null")
        random.nextInt(numPartitions)
    }
    else
    {
        println("key is "+ key + " hashcode is "+key.hashCode)
        math.abs(key.hashCode) % numPartitions
    }
  }
}

  Partition&replica均衡:存在多个partition,每一个partition有本身的replica,每一个replica分布在不一样的Broker节点上;

 

leader均衡:

  每当Leader Partition中止或崩溃领导,由其余副本取代Leader地位。这意味着默认状况下,当该Partition更新启动时,它将只是做为跟随着,不会用于客户端读取和写入,出现于其它Leader在同一个Broker的状况。

  为了不这种不平衡,Kafka有一个优先副本的概念。若是分区的副本的列表为1,5,9,则节点1优选为节点5或9的引导者,由于它在副本列表中较早。

  设置auto.leader.rebalance.enable = true便可实现上述操做;

  等待ISR中的任一个replica“活”过来,而且选它为leader。选择第一个“活”过来的replica(不必定是ISR中的)做为leader。

  这就须要在可用性和一致性当中作出一个简单的平衡。若是必定要等待ISR中的replica“活”过来,那不可用的时间就可能会行对较长。并且若是ISR中的全部replica都没法“活”过来了,或者数据都丢失了,这个partition将永远不可用。选择第一个“活”过来的replcia做为leader,而这个replica不是ISR中的replica,那即便它并不保证已经包含了全部已commit的消息,他也会成为leader而做为comsumer的数据源(前文有说明,全部读写都由leader完成)。Kafka0.8.×使用了第二种方式。根据Kafka的文档,在之后的版本中,Kafka支持用户经过配置选择这两种方式中的一种,从而根据不一样的使用场景选择高可用性仍是强一致性。

Consumer 均衡:

  Kafka保证的是稳定状态下每个Consumer实例只会消费某一个或多个特定Partition的数据。而某个Partition的数据只会被某一个特定的Consumer实例所消费。也就是说Kafka对消息的分配是以Partition为单位分配的,而非以每一条消息做为分配单元。这样设计的劣势是没法保证同一个Consumer Group里的Consumer均匀消费数据,优点是每一个Consumer不用都跟大量的Broker通讯,减小通讯开销,同时也下降了分配难度,保证每一个Partition里的数据能够被有序消费。

 

Consumer均衡方法:

  若是某Consumer Group中Consumer(每一个Consumer只建立1个MessageStream)数量少于Partition数量,则至少有一个Consumer会消费多个Partition的数据;若是Consumer的数量和Partition数量相同,则正好一个Consumer消费一个Partition的数据。而若是Consumer的数量多于Partition的数量时,会有部分Consumer没法消费该Topic下任何一条消息。当添加、删除Consumer时,会触发Consumer的Rebalance算法,从新分配每一个Consumer消费的Partition。

  Consumer Rebalance的算法以下:

 Kafka框架-应用场景:

  消息队列:比起大多数的消息系统来讲,Kafka有更好的吞吐量,内置的分区,冗余及容错性,这让Kafka成为了一个很好的大规模消息处理应用的解决方案。

  网站活动跟踪:跟踪用户浏览页面、搜索以及其余行为,以发布-订阅的模式实时记录到对应的topic里。再作进一步的实时处理,或实时监控,或放到hadoop/离线数据仓库里处理。

  日志收集:服务器上收集日志文件,抽象成一个个日志或事件的信息流,Kafka处理过程延迟低,更容易支持多数据源和分布式数据处理。

  流处理:保存收集流数据,以提供以后对接的Storm或其余流式计算框架进行处理。不少用户将那些从原始topic来的数据进行阶段性处理、汇总、扩充或者以其余的方式转换到新的topic下再继续后面的处理。

相关文章
相关标签/搜索