本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客。版权声明:本套Spark商业应用实战归做者(秦凯新)全部,禁止转载,欢迎学习。java
一句话:多个Consumer订阅了一个Topic时,根据分区策略进行消费者订阅分区的重分配正则表达式
找到Coordinator的算法 与 找到_consumer_offsets目标分区的算法是一致的。算法
reblance 流程流程总体以下图所示,值得强调的几点以下:apache
reblance 监听器解决用户把位移提交到外部存储的状况,在监听器中实现位移保存和位移的重定向。bootstrap
onPartitionsRevoked : rebalance开启新一轮的重平衡前会调用,通常用于手动提交位移,及审计功能数组
onPartitionsAssigned :rebalance在重平衡结束后会调用,通常用于消费逻辑处理session
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
统计rebalance总时长
final AtomicLong totalRebalanceTimeMs =new AtomicLong(0L)
统计rebalance开始时刻
final AtomicLong rebalanceStart =new AtomicLong(0L)
1 重平衡监听
consumer.subscribe(Arrays.asList("test-topic"), new ConsumerRebalanceListener(){
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
for(TopicPartition tp : partitions){
1 保存到外部存储
saveToExternalStore(consumer.position(tp))
2 手动提交位移
//consumer.commitSync(toCommit);
}
rebalanceStart.set(System.currentTimeMillis())
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
totalRebalanceTimeMs.addAndGet(System.currentTimeMillis()-rebalanceStart.get())
for (TopicPartition tp : partitions) {
consumer.seek(tp,readFromExternalStore(tp))
}
}
});
2 消息处理
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
复制代码
实例主题:多线程
public class Main {
public static void main(String[] args) {
String brokerList = "localhost:9092";
String groupId = "testGroup1";
String topic = "test-topic";
int consumerNum = 3;
核心对外封装
ConsumerGroup consumerGroup = new ConsumerGroup(consumerNum, groupId, topic, brokerList);
consumerGroup.execute();
}
}
复制代码
import java.util.ArrayList;
import java.util.List;
public class ConsumerGroup {
private List<ConsumerRunnable> consumers;
public ConsumerGroup(int consumerNum, String groupId, String topic, String brokerList) {
consumers = new ArrayList<>(consumerNum);
for (int i = 0; i < consumerNum; ++i) {
ConsumerRunnable consumerThread = new ConsumerRunnable(brokerList, groupId, topic);
consumers.add(consumerThread);
}
}
public void execute() {
for (ConsumerRunnable task : consumers) {
new Thread(task).start();
}
}
}
复制代码
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import java.util.Arrays;
import java.util.Properties;
public class ConsumerRunnable implements Runnable {
private final KafkaConsumer<String, String> consumer;
public ConsumerRunnable(String brokerList, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true"); //本例使用自动提交位移
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic)); // 本例使用分区副本自动分配策略
}
@Override
public void run() {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);
for (ConsumerRecord<String, String> record : records) {
System.out.println(Thread.currentThread().getName() + " consumed " + record.partition() +
"th message with offset: " + record.offset());
}
}
}
}
复制代码
实例主题:架构
进一步优化建议;运维
public class Main {
public static void main(String[] args) {
String brokerList = "localhost:9092,localhost:9093,localhost:9094";
String groupId = "group2";
String topic = "test-topic";
int workerNum = 5;
ConsumerHandler consumers = new ConsumerHandler(brokerList, groupId, topic);
consumers.execute(workerNum);
try {
Thread.sleep(1000000);
} catch (InterruptedException ignored) {}
consumers.shutdown();
}
}
复制代码
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ConsumerHandler {
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
public ConsumerHandler(String brokerList, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
}
public void execute(int workerNum) {
executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);
for (final ConsumerRecord record : records) {
executors.submit(new Processor(record));
}
}
}
public void shutdown() {
if (consumer != null) {
consumer.close();
}
if (executors != null) {
executors.shutdown();
}
try {
if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("Timeout.... Ignore for this case");
}
} catch (InterruptedException ignored) {
System.out.println("Other thread interrupted this shutdown, ignore for this case.");
Thread.currentThread().interrupt();
}
}
}
复制代码
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class Processor implements Runnable {
private ConsumerRecord<String, String> consumerRecord;
public Processor(ConsumerRecord record) {
this.consumerRecord = record;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " consumed " + consumerRecord.partition()
+ "th message with offset: " + consumerRecord.offset());
}
}
复制代码
Standalone Consumer assign 用于接收指定分区列表的消息和Subscribe是矛盾的。只能二选一。
多个 Consumer 实例消费一个 Topic 借助于 group reblance可谓是天做之合。
若要精准控制,assign逃不了。
poperties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
List<TopicPartion> partitions = new ArrayList<>();
List<PartitionInfo> allPartitions = consumer.partitionsFor("kaiXinTopic")
if(allPartitions != null && !allPartitions.isEmpty){
for(PartitionInfo partitionInfo : allPartitions){
partitions.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()))
}
consumer.assign(partitions)
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
复制代码
本文综合了多本Kafka实战书籍和博客,为了写好本文,参考了大量资料,进行了语言的重组,辛苦成文,各自珍惜!
秦凯新 2181119 2123