Kafka1.0.X_消费者API详解2

偏移量由消费者管理

​kafka Consumer Api还提供了本身存储offset的功能,将offset和data作到原子性,可让消费具备Exactly Once 的语义,比kafka默认的At-least Once更强大java

消费者从指定分区拉取数据-手动更改偏移量

​设置消费者从自定义的位置开始拉取数据,好比从程序中止时已消费的下一Offset开始拉取数据,使用这个功能要求data和offset的update操做是原子的,不然可能会破坏数据一致性数据库

   /*
        手动设置指定分区的offset,只适用于使用Consumer.assign方法添加主题的分区,不适用于kafka自动管理消费者组中的消费者场景,
        后面这种场景可使用ConsumerRebalanceListener作故障恢复使用
     */
    @Test
    public void controlsOffset() {
        Properties props = new Properties();
        //设置kafka集群的地址
        props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
        //设置消费者组,组名字自定义,组名字相同的消费者在一个组
        props.put("group.id", "my_group");
        //开启offset自动提交
        props.put("enable.auto.commit", "false");
        //序列化器
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //实例化一个消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //消费者订阅主题,并设置要拉取的分区
​
        //加一段代码将本身保存的分区和偏移量读取到内存
        //load partition and it's offset
        TopicPartition partition0 = new TopicPartition("mytopic3", 0);
        consumer.assign(Arrays.asList(partition0));
​
        //告知Consumer每一个分区应该从什么位置开始拉取数据,offset从你加载的值或者集合中拿
        consumer.seek(partition0, 4140l);
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
​
                    for (ConsumerRecord<String, String> record : partitionRecords) {
                        System.out.println("partition: " + partition.partition() + " , " + record.offset() + ": " + record.value());
                    }
                    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                }
            }
        } finally {
            consumer.close();
        }
    }

代码和上面的绝大多数都同样,就是要本身加载分区信息,给消费者设置每一个分区的偏移量apache

添加因消费者改变致使kafka rebalance的监听

​ kafka提供该监听来处理当某一个topic的消费者发生变化(加入、退出)时分区从新分配(先解除与消费者的绑定关系,再从新与消费者绑定)用户想作回调的状况,分区与消费者解除绑定时调用onPartitionsRevoked方法;从新绑定时调用onPartitionsAssigned。bootstrap

监听代码缓存

import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.common.TopicPartition;
​
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
​
/*
    kafka提供了这个监听来处理分区的变化,区分被取消时调用onPartitionsRevoked方法;分区被分配时调用onPartitionsAssigned
 */
public class MyConsumerRebalanceListener implements ConsumerRebalanceListener {
    static Map<TopicPartition,Long> partitionMap = new ConcurrentHashMap<>();
    private Consumer<?,?> consumer;
    //实例化Listener的时候将Consumer传进来
    public MyConsumerRebalanceListener(Consumer<?,?> consumer) {
        this.consumer = consumer;
    }
​
    /*
        有新的消费者加入消费者组或者已有消费者从消费者组中移除会触发kafka的rebalance机制,rebalance被调用前会先调用下面的方法
        此时你能够将分区和它的偏移量记录到外部存储中,好比DBMS、文件、缓存数据库等,还能够在这里处理本身的业务逻辑
     */
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        for(TopicPartition partition: partitions){
            //记录分区和它的偏移量
            partitionMap.put(partition,consumer.position(partition));
            //清空缓存
​
            System.out.println("onPartitionsRevoked partition:" + partition.partition()+" - offset"+consumer.position(partition));
        }
    }
​
    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        //设置分区的偏移量
        for(TopicPartition partition: partitions){
            System.out.println("onPartitionsAssigned partition:" + partition.partition()+" - offset"+consumer.position(partition));
            if(partitionMap.get(partition)!=null){
                consumer.seek(partition, partitionMap.get(partition));
            }else{
                //自定义处理逻辑
            }
        }
    }
}

测试代码安全

    @Test
    public void autoCommitAddListner(){
        Properties props = new Properties();
        //设置kafka集群的地址
        props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
        //设置消费者组,组名字自定义,组名字相同的消费者在一个组
        props.put("group.id", "my_group");
        //开启offset自动提交 true-开启 false-关闭
        props.put("enable.auto.commit", "false");
        //序列化器
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //实例化一个消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        MyConsumerRebalanceListener myListener = new MyConsumerRebalanceListener(consumer);
        //消费者订阅主题,能够订阅多个主题
        consumer.subscribe(Arrays.asList("mytopic3"),myListener);
        //consumer.subscribe(Arrays.asList("mytopic3"));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
                for (TopicPartition partition : records.partitions()) {
                    List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                    for (ConsumerRecord<String, String> record : partitionRecords) {
                        System.out.println("partition: " + partition.partition() + " , " + record.offset() + ": " + record.value());
                    }
                    long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                    /*
                        能够将这里的偏移量提交挪到监听的onPartitionsRevoked方法中,控制灵活,可是也很容易出问题
                     */
                    consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
                }
            }
        } finally {
            consumer.close();
        }
    }

其余

  • 使用pause和resume能够暂停和恢复一个分区的消费动做多线程

    consumer.pause(Arrays.asList(new TopicPartition("topic_name",parition_num)))并发

    consumer.resume(Arrays.asList(new TopicPartition("topic_name",parition_num)))jvm

  • 按事务读数据ide

    该操做与Producer的按事务写相匹配,在Consumer代码的配置中增长一行:

    props.put("isolation.level","read_committed");

    注意,按事务读,不能使用在按指定分区拉取数据的消费者中

多线程

KafkaConsumer是线程不安全,kafka官方提供了一种写法来避免线程安全问题

ConsumerRunner:

package com.jv.parallel;
​
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
​
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
​
public class ConsumerRunner implements Runnable{
    private final AtomicBoolean closed = new AtomicBoolean(false);
    private final KafkaConsumer<String,String> consumer;
    private final CountDownLatch latch;
​
    public ConsumerRunner(KafkaConsumer<String,String> consumer, CountDownLatch latch){
        this.consumer = consumer;
        this.latch = latch;
    }
​
    @Override
    public void run() {
        System.out.println("threadName...."+Thread.currentThread().getName());
        try {
            consumer.subscribe(Arrays.asList("mytopic3"));
            while (!closed.get()) {
                ConsumerRecords<String, String> records = consumer.poll(10000);
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("threadName= %s, offset = %d, key = %s, value = %s%n", Thread.currentThread().getName(),record.offset(), record.key(), record.value());
            }
        } catch (WakeupException e) {
            if (!closed.get()) throw e;
        } finally {
            consumer.close();
            latch.countDown();
        }
    }
​
    public void shutdown() {
        System.out.println("close ConsumerRunner");
        closed.set(true);
        consumer.wakeup();
    }
}

驱动方法:

   @Test
    public void autoCommitParallelTest() {
        Properties props = new Properties();
        //设置kafka集群的地址
        props.put("bootstrap.servers", "hadoop01:9092,hadoop02:9092,hadoop03:9092");
        //设置消费者组,组名字自定义,组名字相同的消费者在一个组
        props.put("group.id", "my_group");
        //开启offset自动提交
        props.put("enable.auto.commit", "true");
        //自动提交时间间隔
        props.put("auto.commit.interval.ms", "1000");
        //序列化器
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //实例化一个消费者
        final List<ConsumerRunner> consumers = new ArrayList<>();
        final List<KafkaConsumer<String, String>> kafkaConsumers = new ArrayList<>();
        for(int i = 0;i < 2;i++){
            kafkaConsumers.add(new KafkaConsumer<String, String>(props));
        }
        final CountDownLatch latch = new CountDownLatch(2);
        final ExecutorService executor = Executors.newFixedThreadPool(2);
        for(int i = 0;i < 2;i++){
            ConsumerRunner c = new ConsumerRunner(kafkaConsumers.get(i),latch);
            consumers.add(c);
            executor.submit(c);
        }
​
        /*
            这个方法的意思就是在jvm中增长一个关闭的钩子,当jvm关闭的时候,会执行系统中已经设置的全部经过方法addShutdownHook添加的钩子,当系统执行完这些钩子后,jvm才会关闭
            因此这些钩子能够在jvm关闭的时候进行内存清理、对象销毁、关闭链接等操做
         */
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                System.out.println("....................");
                for (ConsumerRunner consumer : consumers) {
                    consumer.shutdown();
                }
                executor.shutdown();
                try {
                    executor.awaitTermination(5000, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
​
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

可是Kafka官方任然不建议多个线程共用一个Consumer,不然会出现ConcurrentModificationException异常

Kafka提供以下两个方案实现并发:

1.一个线程一个Consumer

每一个线程都拥有本身的Consumer

优势:

  • 写代码容易

  • 由于不须要协调和调度线程,速度比较快

  • 实现分区的有序很是容易

缺点:

  • TCP链接更多,若是分区很是多,这种方案不可行了

  • 消费者多了,可能由于批处理少了,使IO吞吐量减小

  • 并发数严重依赖于分区数(消费者数只能小于等于分区数)

2.Consumer和Processer分离

使用一个或者多个Consumer从Kafka拉取数据,并将数据放到一个阻塞队列中,由Processor从阻塞队列中获取数据并作业务处理。

优势:

  • 将消费和业务处理作垂直切分,而后在水平上能够独立的进行扩展

缺点:

  • 分区顺序难以保障

  • 分区提交很是麻烦

针对这种方案的分区内数据顺序问题,可使用让每一个消费者都有本身的阻塞队列。由于Consumer和Processor已经分离了,如何让Consumer知道数据已经被Processor处理完是比较麻烦的事情,

相关文章
相关标签/搜索