本节主要介绍Kafka从一些topic消费数据的示例。html
使用新版的Consumer,须要先在工程中添加kafka-clients依赖,添加的配置信息以下:java
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.0.0</version> </dependency>
Consumer的建立过程与以前旧的API建立方法同样,一个Consumer必备的最小配置项以下所示:apache
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); // 经过其中的一台broker来找到group的coordinator,并不须要列出全部的broker props.put("group.id", "consumer-tutorial"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // consumer实例
Consumer的其余配置项能够参考New Consumer Configs,除了上面的这几个配置以外,其余的几个比较经常使用的配置信息以下表所示bootstrap
参数 | 默认值 | 说明 |
---|---|---|
heartbeat.interval.ms | 3000 | 当使用Kafka的group管理机制时,consumer向coordinator发送心跳的间隔,这个值要比session.timeout.ms小,最好不要超过session.timeout.ms的\frac{1}{3} |
session.timeout.ms | 30000 | 当使用Kafka的group管理机制时用于检测到consumer失败的时长,若是在这个时间内没有收到consumer的心跳信息,就认为Consumer失败了 |
auto.offset.reset | latest | group首次开始消费数据时的offset,有如下几个值能够选择:earliest、latest、none、anything else. |
enable.auto.commit | true | 设置为true时,Consumer的offset将会被周期性地自动commit |
auto.commit.interval.ms | 5000 | Consumer的offset自动commit时的周期 |
本例使用Kafka的自动commit机制,每隔一段时间(可经过auto.commit.interval.ms
来设置)就会自动进行commit offset。缓存
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "group"); props.put("auto.offset.reset", "earliest"); props.put("enable.auto.commit", "true"); // 自动commit props.put("auto.commit.interval.ms", "1000"); // 自动commit的间隔 props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test1", "test2")); // 可消费多个topic,组成一个list while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value()); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }
这里有几点须要注意:session
- 在使用自动commit时,系统是保证at least once,由于offset是在这些messages被应用处理成功后才进行commit的;
- subscribe方法须要传入全部topic的列表,一个group所消费的topic是不能动态增长的,可是能够在任什么时候间改变这个列表,它会把前面的设置覆盖掉;
- poll中的参数就是设置一个时长,Consumer在进行拉取数据进行block的最大时间限制;
须要注意的:并发
group.id :必须设置
auto.offset.reset:若是想得到消费者启动前生产者生产的消息,则必须设置为earliest;若是只须要得到消费者启动后生产者生产的消息,则不须要设置该项
enable.auto.commit(默认值为true):若是使用手动commit offset则须要设置为false,并再适当的地方调用consumer.commitSync()
,不然每次启动消费折后都会从头开始消费信息(在auto.offset.reset=earliest的状况下);app
手动控制commit异步
要进行手动commit,须要在配置文件中将enable.auto.commit设置为false,来禁止自动commit,本例以手动同步commit为例ide
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "group"); props.put("enable.auto.commit", "false"); //关闭自动commit props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("test1", "test2")); final int minBatchSize = 10; while (true) { ConsumerRecords<String, String> records = consumer.poll(100); int i = 0; for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value()); i++; } if (i >= minBatchSize) { consumer.commitSync(); //批量完成写入后,手工同步commit offset } }
消费者手动设置分区
Kafka在进行消费数据时,能够指定消费某个topic的某个partition,这种使用状况比较特殊,并不须要coordinator进行rebalance,也就意味着这种模式虽然须要设置group id,可是它跟前面的group的机制并不同,它与旧的Consumer中的Simple Consumer类似,这是Kafka在新的Consumer API中对这种状况的支持。
Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "group"); props.put("enable.auto.commit", "false"); //关闭自动commit props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer(props); TopicPartition partition0 = new TopicPartition("test", 0); TopicPartition partition1 = new TopicPartition("test", 2); consumer.assign(Arrays.asList(partition0, partition1)); final int minBatchSize = 10; while (true) { ConsumerRecords<String, String> records = consumer.poll(100); int i = 0; for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value()); i++; } if (i >= minBatchSize) { consumer.commitSync(); //批量完成写入后,手工sync offset } }
注意:
KafkaStream是在Kafka 0.10.0版中新提出的内容,Kafka官方也说了设计这个feature的缘由——为了简单,以前在流处理方面,通常状况下都会使用Kafka做为消息队列,而后再搭建一个流处理环境作流处理,而如今咱们能够直接在Kafka中进行流处理,不须要再搭建另一个环境(加了这个feature以后会使得Kafka变得更加复杂,不过官网说,在使用时咱们只须要在工程中添加一个外部依赖包便可使用这个功能)。
须要在pom文件中添加以下依赖,KafkaStream在实际运行时也是依赖这个外部的jar包运行。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>0.10.0.0</version> </dependency>
KafkaStream使用的一个基本初始化部分以下所示(代码来自Javadoc)
Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); StreamsConfig config = new StreamsConfig(props); KStreamBuilder builder = new KStreamBuilder(); builder.from("my-input-topic").mapValue(value -> value.length().toString()).to("my-output-topic"); KafkaStreams streams = new KafkaStreams(builder, config); streams.start();
完整的配置选项以下表所示,也能够参考Streams Configs
名称 | 描述 | 类型 | 默认值 |
---|---|---|---|
application.id | 流处理应用的标识,对同一个应用须要一致,由于它是做为消费的group_id的 | string | |
bootstrap.servers | host1:port1,host2:port2 这样的列表,是用来发现全部Kafka节点的种子,所以不须要配上全部的Kafka节点 | list | |
client.id | 应用的一个客户端的逻辑名称,设定后能够区分是哪一个客户端在请求 | string | “” |
zookeeper.connect | zookeeper | string | “” |
key.serde | 键的序列化/反序列化类 | class | org.apache.kafka.common.serialization.Serdes$ByteArraySerde |
partition.grouper | 用于分区组织的类,须要实现PartitionGrouper接口 | class | org.apache.kafka.streams.processor.DefaultPartitionGrouper |
replication.factor | 流处理应用会建立change log topic和repartition topic用于管理内部状态,这个参数设定这些topic的副本数 | int | 1 |
state.dir | 状态仓库的存储路径 | string | /tmp/kafka-streams |
timestamp.extractor | 时间戳抽取类,须要实现TimestampExtractor接口 | class | org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor |
value.serde | 值的序列化/反序列化类 | class | org.apache.kafka.common.serialization.Serdes$ByteArraySerde |
buffered.records.per.partition | 每一个分区缓存的最大记录数 | int | 1000 |
commit.interval.ms | 存储处理器当前位置的间隔毫秒数 | long | 30000 |
metric.reporters | 用于性能报告的类列表。须要实现MetricReporter接口。JmxReporter会永远开启不须要指定 | list | [] |
metric.num.samples | 计算性能须要的采样数 | int | 2 |
metric.sample.window.ms | 性能采样的时间间隔 | long | 30000 |
num.standby.replicas | 每一个任务的后备副本数 | int | 0 |
num.stream.threads | 执行流处理的线程数 | int | 1 |
poll.ms | 等待输入的毫秒数 | long | 100 |
state.cleanup.delay.ms | 一个分区迁移后,在删除状态前等待的毫秒数 | long | 60000 |
这是个将一个topic的事件进行过滤的示例,处理很简单,下面给出了这个例子的完整代码。
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.Predicate; import java.util.Properties; /** * Created by matt on 16/7/22. */ public class EventFilter { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-filter"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "10.4.232.70:9091,10.4.232.77:2181"); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); KStreamBuilder builder = new KStreamBuilder(); KStream<String, String> source = builder.stream("test"); source.filter(new Predicate<String, String>() { @Override public boolean test(String key, String value) { return (value.split(",")[3]).equals("food"); } }).to("food"); KafkaStreams streams = new KafkaStreams(builder, props); streams.start(); // usually the stream application would be running forever, // in this example we just let it run for some time and stop since the input data is finite. Thread.sleep(5000L); streams.close(); }
1. 若是consumer比partition多,是浪费,由于kafka的设计是在一个partition上是不容许并发的,因此consumer数不要大于partition数
2. 若是consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,不然会致使partition里面的数据被取的不均匀。最好partiton数目是consumer数目的整数倍,因此partition数目很重要,好比取24,就很容易设定consumer数目
3. 若是consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不一样
4. 增减consumer,broker,partition会致使rebalance,因此rebalance后consumer对应的partition会发生变化
5. High-level接口中获取不到数据的时候是会block的