Kafka笔记4(消费者)

消费者和消费群组:apache

  Kafka消费者从属于消费者群组,一个群组里的消费者订阅的是同一个主题,每一个消费者接收主题的一部分分区消息服务器

  消费者的数量不要超过主题分区的数量,多余的消费者只会被闲置session

  一个主题能够被多个消费群组使用,消费者群组之间互不影响异步

  

  当一个消费者加入群组时,他读取的数据是本来由其余消费者读取的信息函数

  分区的全部权从一个消费者转移至另外一个消费者的行为称为“再均衡”fetch

  再均衡期间,消费者当前的读取状态会丢失,消费者没法读取信息,形成集群一小段时间的不可用,在恢复状态以前会拖慢应用程序线程

  消费者经过向群组协调器broker发送心跳维持他们和群组的从属关系以及他们对分区的全部权关系,若是broker认为消费者死亡会触发再均衡行为blog

 分配分区过程:接口

  当消费者加入群组时,他会向群组协调器发送一个JoinGroup请求,第一个加入群组的消费者称为群主,群主从协调器那里得到群组的成员列表,并负责给每个消费者分配分区。他使用一个实现PartitionAssignor接口的类来决定哪些分区应该被分配给消费者,分配完毕以后,群主把分配状况列表发送给broker,broker再把这些信息发送给全部消费者,每一个消费者只能看到本身的分配信息,只有群主知道群组的全部消费者的分配信息字符串

  消息轮询是消费者API核心,经过从一个简单的轮询向服务器请求数据,一旦消费者订阅了主题,轮询就会处理全部细节,包括群组协调/分区再均衡/发送心跳/获取数据

  一个消费者使用一个线程

  

消费者重要的属性参数配置:

  fetch.min.bytes

    指定了消费者从服务器获取记录的最小字节数,若是broker收到消费者请求,但数据可用量小于fetch.min.bytes,就会等到有足够的可用数据才把它返回给消费者

  fetch.max.wait.ms

    指定broker等待时间,默认500ms  

  max.partition.fetch.bytes

    指定服务器从每一个分区里返回给消费者的最大字节数,默认1MB    max.partition.fetch.size的值必须比broker能接收的最大消息字节数(max.message.size)大

  session.timeout.ms

    指定消费者在被认为死亡以前能够与服务器断开链接的时间,默认3S

    heartbeat.interval.ms =  session.timeout.ms / 3

  auto.offset.reset

    指定消费者在读取一个没有偏移量的分区或者偏移量无效的状况下该如何处理

    =latest  消费者从最新的记录开始读取数据

    =earliest  消费者从起始位置读取分区记录

  enable.auto.commit

    指定消费者是否自动提交偏移量,默认true

      auto.commit.interval.ms 控制提交频率

  partition.assignment.strategy

    =org.apache.kafka.clients.consumer.RangeAssignor 把主题的若干连续分区分配给消费者

    =org.apache.kafka.clients.consumer.RoundRobinAssignor  把主题的全部分区逐个分配给消费者

  client.id  

    任意字符串,broker用来标识从客户端发送来的消息

  max.poll.records

    用于控制单次调用call() 方法返回的记录数量,能够帮你控制在轮询里须要处理的数据量

  receive.buffer.bytes 和 send.buffer.bytes

    默认-1   

 

更新分区当前位置的操做叫提交

  消费者会向一个叫作 _consumer_offset 的特殊主题发送消息,消息里包含了每一个分区的偏移量

  

 

    

  Kafka能够设置消费者自动提交偏移量,设置enable.auto.commit=true,提交时间间隔auto.commit.interval.ms=5s

  自动提交是在轮询里进行的,消费者每次轮询时会检查是否该提交偏移量了,是则提交上一次轮询返回的偏移量

  提交当前偏移量,使用API函数 commitSync()

  异步提交偏移量,使用API函数commitAsync()

    可使用一个单调递增的序列号来维护异步提交顺序

相关文章
相关标签/搜索