kafka消费者Consumer参数设置及参数调优建议-kafka 商业环境实战

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客。版权声明:本套Spark商业应用实战归做者(秦凯新)全部,禁止转载,欢迎学习。apache

1 消息的接收->基于Consumer Group

Consumer Group 主要用于实现高伸缩性,高容错性的Consumer机制。所以,消息的接收是基于Consumer Group 的。组内多个Consumer实例能够同时读取Kafka消息,同一时刻一条消息只能被一个消费者消费,并且一旦某一个consumer "挂了", Consumer Group 会当即将已经崩溃的Consumer负责的分区转交给其余Consumer来负责。从而保证 Consumer Group 可以正常工做。bootstrap

2 位移保存->基于Consumer Group

说来奇怪,位移保存是基于Consumer Group,同时引入检查点模式,按期实现offset的持久化。session

3 位移提交->抛弃ZooKeeper

Consumer会按期向kafka集群汇报本身消费数据的进度,这一过程叫作位移的提交。这一过程已经抛弃Zookeeper,由于Zookeeper只是一个协调服务组件,不能做为存储组件,高并发的读取势必形成Zk的压力。架构

  • 新版本位移提交是在kafka内部维护了一个内部Topic(_consumer_offsets)。
  • 在kafka内部日志目录下面,总共有50个文件夹,每个文件夹包含日志文件和索引文件。日志文件主要是K-V结构,(group.id,topic,分区号)。
  • 假设线上有不少的consumer和ConsumerGroup,经过对group.id作Hash求模运算,这50个文件夹就能够分散同时位移提交的压力。

4 官方案例

4.1 自动提交位移

Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
     }
复制代码

4.2 手动提交位移

Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "false");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     final int minBatchSize = 200;
     List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records) {
             buffer.add(record);
         }
         if (buffer.size() >= minBatchSize) {
             insertIntoDb(buffer);
             consumer.commitSync();
             buffer.clear();
         }
     }
复制代码

5 kafka Consumer参数设置


  • consumer.poll(1000) 重要参数
  • 新版本的Consumer的Poll方法使用了相似于Select I/O机制,所以全部相关事件(包括reblance,消息获取等)都发生在一个事件循环之中。
  • 1000是一个超时时间,一旦拿到足够多的数据(参数设置),consumer.poll(1000)会当即返回 ConsumerRecords<String, String> records。
  • 若是没有拿到足够多的数据,会阻塞1000ms,但不会超过1000ms就会返回。

  • session. timeout. ms <= coordinator检测失败的时间
  • 默认值是10s
  • 该参数是 Consumer Group 主动检测 (组内成员comsummer)崩溃的时间间隔。若设置10min,那么Consumer Group的管理者(group coordinator)可能须要10分钟才能感觉到。太漫长了是吧。

  • max. poll. interval. ms <= 处理逻辑最大时间
  • 这个参数是0.10.1.0版本后新增的,可能不少地方看不到喔。这个参数须要根据实际业务处理时间进行设置,一旦Consumer处理不过来,就会被踢出Consumer Group 。
  • 注意:若是业务平均处理逻辑为1分钟,那么max. poll. interval. ms须要设置稍微大于1分钟便可,可是session. timeout. ms能够设置小一点(如10s),用于快速检测Consumer崩溃。

  • auto.offset.reset
  • 该属性指定了消费者在读取一个没有偏移量后者偏移量无效(消费者长时间失效当前的偏移量已通过时而且被删除了)的分区的状况下,应该做何处理,默认值是latest,也就是从最新记录读取数据(消费者启动以后生成的记录),另外一个值是earliest,意思是在偏移量无效的状况下,消费者从起始位置开始读取数据。

  • enable.auto.commit
  • 对于精确到一次的语义,最好手动提交位移

  • fetch.max.bytes
  • 单次获取数据的最大消息数。

  • max.poll.records <= 吞吐量
  • 单次poll调用返回的最大消息数,若是处理逻辑很轻量,能够适当提升该值。
  • 一次从kafka中poll出来的数据条数,max.poll.records条数据须要在在session.timeout.ms这个时间内处理完
  • 默认值为500

  • heartbeat. interval. ms <= 竟然拖家带口
  • heartbeat心跳主要用于沟通交流,及时返回请求响应。这个时间间隔真是越快越好。由于一旦出现reblance,那么就会将新的分配方案或者通知从新加入group的命令放进心跳响应中。

  • connection. max. idle. ms <= socket链接
  • kafka会按期的关闭空闲Socket链接。默认是9分钟。若是不在意这些资源开销,推荐把这些参数值为-1,即不关闭这些空闲链接。

  • request. timeout. ms
  • 这个配置控制一次请求响应的最长等待时间。若是在超时时间内未获得响应,kafka要么重发这条消息,要么超太重试次数的状况下直接置为失败。
  • 消息发送的最长等待时间.需大于session.timeout.ms这个时间

  • fetch.min.bytes
  • server发送到消费端的最小数据,如果不知足这个数值则会等待直到知足指定大小。默认为1表示当即接收。

  • fetch.wait.max.ms
  • 如果不知足fetch.min.bytes时,等待消费端请求的最长等待时间

  • 0.11 新功能
  • 空消费组延时rebalance,主要在server.properties文件配置
  • group.initial.rebalance.delay.ms <=牛逼了,个人kafka,防止成员加入请求后本应当即开启的rebalance
  • 对于用户来讲,这个改进最直接的效果就是新增了一个broker配置:group.initial.rebalance.delay.ms
  • 默认是3秒钟。
  • 主要做用是让coordinator推迟空消费组接收到成员加入请求后本应当即开启的rebalance。在实际使用时,假设你预估你的全部consumer组成员加入须要在10s内完成,那么你就能够设置该参数=10000。

6 线上采坑

org.apache.kafka.clients.consumer.CommitFailedException:
 Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. 
This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. 
You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. [com.bonc.framework.server.kafka.consumer.ConsumerLoop]
复制代码
基于最新版本10,注意此版本session. timeout. ms 与max.poll.interval.ms进行功能分离了。
  • 能够发现频繁reblance,并伴随者重复性消费,这是一个很严重的问题,就是处理逻辑太重,max.poll. interval.ms 太小致使。发生的缘由就是 poll()的循环调用时间过长,出现了处理超时。此时只用调大max.poll. interval.ms ,调小max.poll.records便可,同时要把request. timeout. ms设置大于max.poll. interval.ms

7 总结

优化会继续,暂时把核心放在request. timeout. ms, max. poll. interval. ms,max.poll.records 上,避免由于处理逻辑太重,致使Consumer被频繁的踢出Consumer group。并发

秦凯新 于深圳运维

相关文章
相关标签/搜索