4-kafka0.10 新消费者使用

Consumer Client

本节主要介绍Kafka从一些topic消费数据的示例。html

配置

使用新版的Consumer,须要先在工程中添加kafka-clients依赖,添加的配置信息以下:java

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>0.10.0.0</version>
</dependency>

初始化与配置

Consumer的建立过程与以前旧的API建立方法同样,一个Consumer必备的最小配置项以下所示:apache

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // 经过其中的一台broker来找到group的coordinator,并不须要列出全部的broker
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // consumer实例

Consumer的其余配置项能够参考New Consumer Configs,除了上面的这几个配置以外,其余的几个比较经常使用的配置信息以下表所示bootstrap

参数 默认值 说明
heartbeat.interval.ms 3000 当使用Kafka的group管理机制时,consumer向coordinator发送心跳的间隔,这个值要比session.timeout.ms小,最好不要超过session.timeout.ms的\frac{1}{3}
session.timeout.ms 30000 当使用Kafka的group管理机制时用于检测到consumer失败的时长,若是在这个时间内没有收到consumer的心跳信息,就认为Consumer失败了
auto.offset.reset latest group首次开始消费数据时的offset,有如下几个值能够选择:earliest、latest、none、anything else.
enable.auto.commit true 设置为true时,Consumer的offset将会被周期性地自动commit
auto.commit.interval.ms 5000 Consumer的offset自动commit时的周期

Consumer Auto Offset Commit

本例使用Kafka的自动commit机制,每隔一段时间(可经过auto.commit.interval.ms来设置)就会自动进行commit offset。缓存

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "true"); // 自动commit
props.put("auto.commit.interval.ms", "1000"); // 自动commit的间隔
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test1", "test2")); // 可消费多个topic,组成一个list
while (true) {
   ConsumerRecords<String, String> records = consumer.poll(100);
   for (ConsumerRecord<String, String> record : records) {
       System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
       try {
           Thread.sleep(1000);
       } catch (InterruptedException e) {
           e.printStackTrace();
       }
   }
}

这里有几点须要注意:session

  1. 在使用自动commit时,系统是保证at least once,由于offset是在这些messages被应用处理成功后才进行commit的;
  2. subscribe方法须要传入全部topic的列表,一个group所消费的topic是不能动态增长的,可是能够在任什么时候间改变这个列表,它会把前面的设置覆盖掉;
  3. poll中的参数就是设置一个时长,Consumer在进行拉取数据进行block的最大时间限制;

须要注意的:并发

group.id :必须设置 
auto.offset.reset:若是想得到消费者启动前生产者生产的消息,则必须设置为earliest;若是只须要得到消费者启动后生产者生产的消息,则不须要设置该项 
enable.auto.commit(默认值为true):若是使用手动commit offset则须要设置为false,并再适当的地方调用consumer.commitSync(),不然每次启动消费折后都会从头开始消费信息(在auto.offset.reset=earliest的状况下);app

Consumer Manual Offset Control

手动控制commit异步

要进行手动commit,须要在配置文件中将enable.auto.commit设置为false,来禁止自动commit,本例以手动同步commit为例ide

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group");
props.put("enable.auto.commit", "false"); //关闭自动commit
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test1", "test2"));
final int minBatchSize = 10;
while (true) {
   ConsumerRecords<String, String> records = consumer.poll(100);
   int i = 0;
   for (ConsumerRecord<String, String> record : records) {
       System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
       i++;
   }
   if (i >= minBatchSize) {
       consumer.commitSync(); //批量完成写入后,手工同步commit offset
   }
}
  1. 在本例中,咱们调用了commitSync方法,这是同步commit的方式,同时Kafka还提供了commitAsync方法,它们的区别是:使用同步提交时,consumer会进行block知道commit的结果返回,这样的话若是commit失败就能够今早地发现错误,而当使用异步commit时,commit的结果还未返回,Consumer就会开始拉取下一批的数据,可是使用异步commit能够系统的吞吐量,具体使用哪一种方式须要开发者本身权衡;
  2. 本例中的实现依然是保证at least once,可是若是每次拉取到数据以后,就进行commit,最后再处理数据,就能够保证at last once。

 

Consumer Manual Partition Assign

消费者手动设置分区

Kafka在进行消费数据时,能够指定消费某个topic的某个partition,这种使用状况比较特殊,并不须要coordinator进行rebalance,也就意味着这种模式虽然须要设置group id,可是它跟前面的group的机制并不同,它与旧的Consumer中的Simple Consumer类似,这是Kafka在新的Consumer API中对这种状况的支持。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group");
props.put("enable.auto.commit", "false"); //关闭自动commit
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer(props);
TopicPartition partition0 = new TopicPartition("test", 0);
TopicPartition partition1 = new TopicPartition("test", 2);
consumer.assign(Arrays.asList(partition0, partition1));

final int minBatchSize = 10;
while (true) {
   ConsumerRecords<String, String> records = consumer.poll(100);
   int i = 0;
   for (ConsumerRecord<String, String> record : records) {
       System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value());
       i++;
   }
   if (i >= minBatchSize) {
       consumer.commitSync(); //批量完成写入后,手工sync offset
   }
}

注意:

  1. 与前面的subscribe方法同样,在调用assign方法时,须要传入这个Consumer要消费的全部TopicPartition的列表;
  2. 无论对于simple consumer仍是consumer group,全部offset的commit都必须通过group coordinator;
  3. 在进行commit时,必须设置一个合适的group.id,避免与其余的group产生冲突。若是一个simple consumer试图使用一个与一个active group相同的id进行commit offset,coordinator将会拒绝这个commit请求,会返回一个CommitFailedException异常,可是,若是一个simple consumer与另外一个simple consumer使用同一个id,系统就不会报任何错误。

 

KafkaStream使用

KafkaStream是在Kafka 0.10.0版中新提出的内容,Kafka官方也说了设计这个feature的缘由——为了简单,以前在流处理方面,通常状况下都会使用Kafka做为消息队列,而后再搭建一个流处理环境作流处理,而如今咱们能够直接在Kafka中进行流处理,不须要再搭建另一个环境(加了这个feature以后会使得Kafka变得更加复杂,不过官网说,在使用时咱们只须要在工程中添加一个外部依赖包便可使用这个功能)。

 

配置

须要在pom文件中添加以下依赖,KafkaStream在实际运行时也是依赖这个外部的jar包运行。

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-streams</artifactId>
   <version>0.10.0.0</version>
</dependency>

初始化与配置

KafkaStream使用的一个基本初始化部分以下所示(代码来自Javadoc

Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
StreamsConfig config = new StreamsConfig(props);

KStreamBuilder builder = new KStreamBuilder();
builder.from("my-input-topic").mapValue(value -> value.length().toString()).to("my-output-topic");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

完整的配置选项以下表所示,也能够参考Streams Configs

 

名称 描述 类型 默认值
application.id 流处理应用的标识,对同一个应用须要一致,由于它是做为消费的group_id的 string  
bootstrap.servers host1:port1,host2:port2 这样的列表,是用来发现全部Kafka节点的种子,所以不须要配上全部的Kafka节点 list  
client.id 应用的一个客户端的逻辑名称,设定后能够区分是哪一个客户端在请求 string “”
zookeeper.connect zookeeper string “”
key.serde 键的序列化/反序列化类 class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
partition.grouper 用于分区组织的类,须要实现PartitionGrouper接口 class org.apache.kafka.streams.processor.DefaultPartitionGrouper
replication.factor 流处理应用会建立change log topic和repartition topic用于管理内部状态,这个参数设定这些topic的副本数 int 1
state.dir 状态仓库的存储路径 string /tmp/kafka-streams
timestamp.extractor 时间戳抽取类,须要实现TimestampExtractor接口 class org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor
value.serde 值的序列化/反序列化类 class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
buffered.records.per.partition 每一个分区缓存的最大记录数 int 1000
commit.interval.ms 存储处理器当前位置的间隔毫秒数 long 30000
metric.reporters 用于性能报告的类列表。须要实现MetricReporter接口。JmxReporter会永远开启不须要指定 list []
metric.num.samples 计算性能须要的采样数 int 2
metric.sample.window.ms 性能采样的时间间隔 long 30000
num.standby.replicas 每一个任务的后备副本数 int 0
num.stream.threads 执行流处理的线程数 int 1
poll.ms 等待输入的毫秒数 long 100
state.cleanup.delay.ms 一个分区迁移后,在删除状态前等待的毫秒数 long 60000

小示例

这是个将一个topic的事件进行过滤的示例,处理很简单,下面给出了这个例子的完整代码。

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Predicate;

import java.util.Properties;

/**
* Created by matt on 16/7/22.
*/
public class EventFilter {
   public static void main(String[] args) throws Exception {
       Properties props = new Properties();
       props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-filter");
       props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.4.232.70:9091,10.4.232.77:2181");
       props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
       props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

       // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data
       props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

       KStreamBuilder builder = new KStreamBuilder();

       KStream<String, String> source = builder.stream("test");

       source.filter(new Predicate<String, String>() {
           @Override
           public boolean test(String key, String value) {
               return (value.split(",")[3]).equals("food");
           }
       }).to("food");

       KafkaStreams streams = new KafkaStreams(builder, props);
       streams.start();

       // usually the stream application would be running forever,
       // in this example we just let it run for some time and stop since the input data is finite.
       Thread.sleep(5000L);

       streams.close();
   }

官方对于consumer与partition的建议

1. 若是consumer比partition多,是浪费,由于kafka的设计是在一个partition上是不容许并发的,因此consumer数不要大于partition数

2. 若是consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,不然会致使partition里面的数据被取的不均匀。最好partiton数目是consumer数目的整数倍,因此partition数目很重要,好比取24,就很容易设定consumer数目

3. 若是consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不一样

4. 增减consumer,broker,partition会致使rebalance,因此rebalance后consumer对应的partition会发生变化

5. High-level接口中获取不到数据的时候是会block的

相关文章
相关标签/搜索