Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature,它提供了对存储于Kafka内的数据进行流式处理和分析的功能。简而言之,Kafka Stream就是一个用来作流计算的类库,与Storm、Spark Streaming、Flink的做用相似,但要轻量得多。java
Kafka Stream的基本概念:数据库
Kafka Stream的高层架构图:apache
Kafka Stream关键词:bootstrap
以下图所示:bash
下图是Kafka Stream完整的高层架构图:服务器
从上图中能够看到,Consumer
对一组Partition
进行消费,这组Partition
能够在一个Topic中或多个Topic中。而后造成数据流,通过各个流处理器后最终经过Producer
输出到一组Partition
中,一样这组Partition
也能够在一个Topic中或多个Topic中。这个过程就是数据流的输入和输出。架构
所以,咱们在使用Stream API前须要先建立两个Topic,一个做为输入,一个做为输出。到服务器上使用命令行建立两个Topic:app
[root@txy-server2 ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic input-topic [root@txy-server2 ~]# kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic output-topic
因为以前依赖的kafka-clients
包中没有Stream API,因此须要另外引入Stream的依赖包。在项目中添加以下依赖:ide
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.5.0</version> </dependency>
接下来以一个经典的词频统计为例,演示一下Stream API的使用。代码示例:测试
package com.zj.study.kafka.stream; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Produced; import java.util.List; import java.util.Properties; public class StreamSample { private static final String INPUT_TOPIC = "input-topic"; private static final String OUTPUT_TOPIC = "output-topic"; /** * 构建配置属性 */ public static Properties getProperties() { Properties properties = new Properties(); properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "49.232.153.84:9092"); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-app"); properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); return properties; } public static KafkaStreams createKafkaStreams() { Properties properties = getProperties(); // 构建流结构拓扑 StreamsBuilder builder = new StreamsBuilder(); // 构建wordCount这个Processor wordCountStream(builder); Topology topology = builder.build(); // 构建KafkaStreams return new KafkaStreams(topology, properties); } /** * 定义流计算过程 * 例子为词频统计 */ public static void wordCountStream(StreamsBuilder builder) { // 不断的从INPUT_TOPIC上获取新的数据,并追加到流上的一个抽象对象 KStream<String, String> source = builder.stream(INPUT_TOPIC); // KTable是数据集的抽象对象 KTable<String, Long> count = source.flatMapValues( // 以空格为分隔符将字符串进行拆分 v -> List.of(v.toLowerCase().split(" ")) // 按value进行分组统计 ).groupBy((k, v) -> v).count(); KStream<String, Long> sink = count.toStream(); // 将统计结果输出到OUTPUT_TOPIC sink.to(OUTPUT_TOPIC, Produced.with(Serdes.String(), Serdes.Long())); } public static void main(String[] args) { KafkaStreams streams = createKafkaStreams(); // 启动该Stream streams.start(); } }
KTable
与KStream
的关系与区别,以下图:
KTable
相似于一个时间片断,在一个时间片断内输入的数据就会update进去,以这样的形式来维护这张表KStream
则没有update这个概念,而是不断的追加运行以上代码,而后到服务器中使用kafka-console-producer.sh
脚本命令向input-topic
生产一些数据,以下:
[root@txy-server2 ~]# kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic input-topic >Hello World Java >Hello World Kafka >Hello Java Kafka >Hello Java
而后再运行kafka-console-consumer.sh
脚本命令从output-topic
中消费数据,并进行打印。具体以下:
[root@txy-server2 ~]# kafka-console-consumer.sh --bootstrap-server 172.21.0.10:9092 --topic output-topic --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer --from-beginning
控制台输出的结果:
world 2 hello 3 java 2 kafka 2 hello 4 java 3
从输出结果中能够看到,Kafka Stream首先是对前三行语句进行了一次词频统计,因此前半段是:
world 2 hello 3 java 2 kafka 2
当最后一行输入以后,又再作了一次词频统计,并针对新的统计结果进行输出,其余没有变化的则不做输出,因此最后打印了:
hello 4 java 3
这也是KTable
和KStream
的一个体现,从测试的结果能够看出Kafka Stream是实时进行流计算的,而且每次只会针对有变化的内容进行输出。
在以前的例子中,咱们是从某个Topic读取数据进行流处理后再输出到另外一个Topic里。但在一些场景下,咱们可能不但愿将结果数据输出到Topic,而是写入到一些存储服务中,例如ElasticSearch、MongoDB、MySQL等。
在这种场景下,就能够利用到foreach
方法,该方法用于迭代流中的元素。咱们能够在foreach
中将数据存入例如Map、List等容器,而后再批量写入到数据库或其余存储中间件便可。
foreach
方法使用示例:
public static void foreachStream(StreamsBuilder builder) { KStream<String, String> source = builder.stream(INPUT_TOPIC); source.flatMapValues( v -> List.of(v.toLowerCase().split(" ")) ).foreach((k, v) -> System.out.println(k + " : " + v)); }