kafka partition(分区)与 group

1、java

一、原理图shell

二、原理描述markdown

一个topic 能够配置几个partition,produce发送的消息分发到不一样的partition中,consumer接受数据的时候是按照group来接受,kafka确保每一个partition只能同一个group中的同一个consumer消费,若是想要重复消费,那么须要其余的组来消费。Zookeerper中保存这每一个topic下的每一个partition在每一个group中消费的offset 
新版kafka把这个offsert保存到了一个__consumer_offsert的topic下 
这个__consumer_offsert 有50个分区,经过将group的id哈希值%50的值来肯定要保存到那一个分区.  这样也是为了考虑到zookeeper不擅长大量读写的缘由。
因此,若是要一个group用几个consumer来同时读取的话,须要多线程来读取,一个线程至关于一个consumer实例。当consumer的数量大于分区的数量的时候,有的consumer线程会读取不到数据。 
假设一个topic test 被groupA消费了,如今启动另一个新的groupB来消费test,默认test-groupB的offset不是0,而是没有新创建,除非当test有数据的时候,groupB会收到该数据,该条数据也是第一条数据,groupB的offset也是刚初始化的ofsert, 除非用显式的用–from-beginnging 来获取从0开始数据 多线程

三、查看topic-group的offsert 负载均衡

位置:zookeeper 
路径:[zk: localhost:2181(CONNECTED) 3] ls /brokers/topics/__consumer_offsets/partitions 
在zookeeper的topic中有一个特殊的topic __consumer_offserts 
计算方法:(放入哪一个partitions)ide

int hashCode = Math.abs("ttt".hashCode());工具

int partition = hashCode % 50;oop

先计算group的hashCode,再除以分区数(50),能够获得partition的值 post

使用命令查看: kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 11 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"性能

4.参数 
auto.offset.reset:默认值为largest,表明最新的消息,smallest表明从最先的消息开始读取,当consumer刚开始建立的时候没有offset这种状况,若是设置了largest,则为当收到最新的一条消息的时候开始记录offsert,若设置为smalert,那么会从头开始读partition

 
2、
一、Topic
     Topic在逻辑上能够被认为是一个queue,每条消费都必须指定它的Topic,能够简单理解为必须指明把这条消息放进哪一个queue里。为了使得Kafka的吞吐率能够线性提升,物理上把Topic分红一个或多个Partition,每一个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的全部消息和索引文件。若建立topic1和topic2两个topic,且分别有13个和19个分区,则整个集群上会相应会生成共32个文件夹(本文所用集群共8个节点,此处topic1和topic2 replication-factor均为1),以下图所示。
二、对于传统的message queue而言,通常会删除已经被消费的消息,而Kafka集群会保留全部的消息,不管其被消费与否。固然,由于磁盘限制,不可能永久保留全部数据(实际上也不必),
     所以Kafka提供两种策略删除旧数据。一是基于时间,二是基于Partition文件大小。
     例如能够经过配置$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可在Partition文件超过1GB时删除旧数据,配置以下所示。
   这里要注意,由于Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,因此这里删除过时文件与提升Kafka性能无关。选择怎样的删除策略只与磁盘以及具体的需求有关。另外,Kafka会为每个Consumer Group保留一些metadata信息——当前消费的消息的position,也即offset。这个offset由Consumer控制。正常状况下Consumer会在消费完一条消息后递增该offset。固然,Consumer也可将offset设成一个较小的值,从新消费一些消息。由于offet由Consumer控制,因此Kafka broker是无状态的,它不须要标记哪些消息被哪些消费过,也不须要经过broker去保证同一个Consumer Group只有一个Consumer能消费某一条消息,所以也就不须要锁机制,这也为Kafka的高吞吐率提供了有力保障。
 
 三、producer
Producer发送消息到broker时,会根据Paritition机制选择将其存储到哪个Partition。若是Partition机制设置合理,全部消息能够均匀分布到不一样的Partition里,这样就实现了负载均衡。若是一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不一样的消息能够并行写入不一样broker的不一样Partition里,极大的提升了吞吐率。能够在$KAFKA_HOME/config/server.properties中经过配置项num.partitions来指定新建Topic的默认Partition数量,也可在建立Topic时经过参数指定,同时也能够在Topic建立以后经过Kafka提供的工具修改。
 
在发送一条消息时,能够指定这条消息的key,Producer根据这个key和Partition机制来判断应该将这条消息发送到哪一个Parition。Paritition机制能够经过指定Producer的paritition. class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。本例中若是key能够被解析为整数则将对应的整数与Partition总数取余,该消息会被发送到该数对应的Partition。(每一个Parition都会有个序号,序号从0开始)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import  kafka.producer.Partitioner;
import  kafka.utils.VerifiableProperties;
 
public  class  JasonPartitioner<T>  implements  Partitioner {
 
     public  JasonPartitioner(VerifiableProperties verifiableProperties) {}
 
     @Override
     public  int  partition(Object key,  int  numPartitions) {
         try  {
             int  partitionNum = Integer.parseInt((String) key);
             return  Math.abs(Integer.parseInt((String) key) % numPartitions);
         catch  (Exception e) {
             return  Math.abs(key.hashCode() % numPartitions);
         }
     }
}

  若是将上例中的类做为partition.class,并经过以下代码发送20条消息(key分别为0,1,2,3)至topic3(包含4个Partition)。

1
2
3
4
5
6
7
8
9
10
public  void  sendMessage()  throws  InterruptedException{
   for ( int  i =  1 ; i <=  5 ; i++){
        List messageList =  new  ArrayList<KeyedMessage<String, String>>();
         for ( int  j =  0 ; j <  4 ; j++){
            messageList.add( new  KeyedMessage<String, String>( "topic2" , j+ "" "The "  + i +  " message for key "  + j));
        }
        producer.send(messageList);
     }
  producer.close();
}

  则key相同的消息会被发送并存储到同一个partition里,并且key的序号正好和Partition序号相同。(Partition序号从0开始,本例中的key也从0开始)。下图所示是经过Java程序调用Consumer后打印出的消息列表。

四、consumer group   (本节全部描述都是基于Consumer hight level API而非low level API)。

     使用Consumer high level API时,同一Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费,但多个Consumer Group可同时消费这一消息。

这是Kafka用来实现一个Topic消息的广播(发给全部的Consumer)和单播(发给某一个Consumer)的手段。一个Topic能够对应多个Consumer Group。若是须要实现广播,只要每一个Consumer有一个独立的Group就能够了。要实现单播只要全部的Consumer在同一个Group里。用Consumer Group还能够将Consumer进行自由的分组而不须要屡次发送消息到不一样的Topic。

实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还能够同时将数据实时备份到另外一个数据中心,只须要保证这三个操做所使用的Consumer属于不一样的Consumer Group便可。

下面这个例子更清晰地展现了Kafka Consumer Group的特性。首先建立一个Topic (名为topic1,包含3个Partition),而后建立一个属于group1的Consumer实例,并建立三个属于group2的Consumer实例,最后经过Producer向topic1发送key分别为1,2,3的消息。结果发现属于group1的Consumer收到了全部的这三条消息,同时group2中的3个Consumer分别收到了key为1,2,3的消息。

相关文章
相关标签/搜索