本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客。版权声明:本套Spark商业应用实战归做者(秦凯新)全部,禁止转载,欢迎学习。apache
Consumer Group 主要用于实现高伸缩性,高容错性的Consumer机制。所以,消息的接收是基于Consumer Group 的。组内多个Consumer实例能够同时读取Kafka消息,同一时刻一条消息只能被一个消费者消费,并且一旦某一个consumer "挂了", Consumer Group 会当即将已经崩溃的Consumer负责的分区转交给其余Consumer来负责。从而保证 Consumer Group 可以正常工做。bootstrap
说来奇怪,位移保存是基于Consumer Group,同时引入检查点模式,按期实现offset的持久化。session
Consumer会按期向kafka集群汇报本身消费数据的进度,这一过程叫作位移的提交。这一过程已经抛弃Zookeeper,由于Zookeeper只是一个协调服务组件,不能做为存储组件,高并发的读取势必形成Zk的压力。架构
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
复制代码
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
复制代码
org.apache.kafka.clients.consumer.CommitFailedException:
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member.
This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing.
You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. [com.bonc.framework.server.kafka.consumer.ConsumerLoop]
复制代码
优化会继续,暂时把核心放在request. timeout. ms, max. poll. interval. ms,max.poll.records 上,避免由于处理逻辑太重,致使Consumer被频繁的踢出Consumer group。并发
秦凯新 于深圳运维