本教程假定你第一次,且没有搭建现有的Kafka或ZooKeeper。可是,若是你已经启动了Kafka和ZooKeeper,请跳过前两个步骤。java
Kafka Streams结合了在客户端编写和部署标准Java和Scala应用程序的简单性以及Kafka服务器端集群技术的优点,使这些应用程序具备高度可伸缩性,弹性,容错性,分布式等特性。算法
这个快速入门示例将演示如何运行一个流应用程序。一个WordCountDemo
的例子(为了方便阅读,使用的是java8 lambda表达式)apache
// Serializers/deserializers (serde) for String and Long types final Serde<String> stringSerde = Serdes.String(); final Serde<Long> longSerde = Serdes.Long(); // Construct a `KStream` from the input topic "streams-plaintext-input", where message values // represent lines of text (for the sake of this example, we ignore whatever may be stored // in the message keys). KStream<String, String> textLines = builder.stream("streams-plaintext-input", Consumed.with(stringSerde, stringSerde); KTable<String, Long> wordCounts = textLines // Split each text line, by whitespace, into words. .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) // Group the text words as message keys .groupBy((key, value) -> value) // Count the occurrences of each word (message key). .count() // Store the running counts as a changelog stream to the output topic. wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
从输入的文本计算出一个词出现的次数。可是,不像其余的WordCount
的例子,你可能会看到,在有限的数据基础上,执行的演示应用程序的行为略有不一样,由于它应该是在一个无限数据的操做,数据流。相似的有界变量,它是一种动态算法,跟踪和更新的单词计数。然而,因为它必须假设潜在的无界输入数据,它会按期输出其当前状态和结果,同时继续处理更多的数据,由于它不知道何时它处理过的“全部”的输入数据。bootstrap
做为第一步,咱们将启动Kafka,而后咱们将输入数据准备到Kafka主题,而后由Kafka Streams应用程序处理。服务器
下载1.1.0版本
并解压它。注意,有多个可下载的Scala版本,咱们选择在这里使用推荐版本(2.11):微信
> tar -xzf kafka_2.11-1.1.0.tgz > cd kafka_2.11-1.1.0
Kafka使用Zookeeper,因此第一步启动Zookeeper服务。运维
> bin/zookeeper-server-start.sh config/zookeeper.properties [2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) ...
如今启动 Kafka server:socket
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties) [2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties) ...
接下来,咱们建立一个输入主题“streams-plaintext-input”,和一个输出主题"streams-wordcount-output":分布式
> bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ --topic streams-plaintext-input Created topic "streams-plaintext-input".
注意:由于输出主题是更新日志流(参见下面的应用程序输出的说明),因此咱们为输出主题启用了压缩
。工具
> bin/kafka-topics.sh --create \
--zookeeper localhost:2181 \ --replication-factor 1 \ --partitions 1 \ --topic streams-wordcount-output \ --config cleanup.policy=compact Created topic "streams-wordcount-output".
也可使用kafka topic工具查看主题描述:
> bin/kafka-topics.sh --zookeeper localhost:2181 --describe Topic:streams-plaintext-input PartitionCount:1 ReplicationFactor:1 Configs: Topic: streams-plaintext-input Partition: 0 Leader: 0 Replicas: 0 Isr: 0 Topic:streams-wordcount-output PartitionCount:1 ReplicationFactor:1 Configs: Topic: streams-wordcount-output Partition: 0 Leader: 0 Replicas: 0 Isr: 0
如下命令启动WordCount演示程序:
> bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountDemo
演示程序将从输入主题streams-plaintext-input
中读取,对每一个读取消息执行WordCount算法
计算,并将其当前结果连续写入输出主题streams-wordcount-output
。 所以,除了日志条目外,不会有任何STDOUT输出,由于结果会写回到Kafka中。
如今咱们另外开一个终端,来启动生产者来为该主题写入一些输入数据:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input
在开一个终端,读取输出主题的数据。
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer
如今,咱们经过输入一行文本而后按,生产一些新的消息到输入主题streams-plaintext-input
。其中消息key为空,消息value为刚刚输入的字符串编码文本行(实际上,应用程序的输入数据一般会连续流入Kafka,而不是 像咱们在这个快速入门中那样手动输入):
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input all streams lead to kafka
这些消息将被Wordcount程序处理,而后输出数据到streams-wordcount-output
主题中,咱们新打开一个命令窗口,输出消费者:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer all 1 streams 1 lead 1 to 1 kafka 1
这里,第一列是java.lang.String格式的Kafka消息key,表示正在计数的单词,第二列是java.lang.Longformat中的消息value,表示该单词的最新计数。
如今,用生产者继续往streams-plaintext-input主题中发消息,输入"hello kafka streams",而后:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-plaintext-input all streams lead to kafka hello kafka streams
在消费者命令窗口,你能够观察WordCount程序写入到输出主题的数据:
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer all 1 streams 1 lead 1 to 1 kafka 1 hello 1 kafka 2 streams 2
在这里,最后一行打印行kafka 2
和streams 2
表示计数已经从1递增到2。每当你向输入主题写入更多的输入消息时,你将观察到新的消息被添加到streams-wordcount-output
主题,表示由WordCount应用程序计算出的最新字数。让咱们输入一个最终的输入文本行“join kafka summit”,而后在控制台生产者中输入主题streams-wordcount-input以前的:
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic streams-wordcount-input all streams lead to kafka hello kafka streams join kafka summit
streams-wordcount-output主题随后将显示相应的更新变化(请参见最后三行):
> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic streams-wordcount-output \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer all 1 streams 1 lead 1 to 1 kafka 1 hello 1 kafka 2 streams 2 join 1 kafka 3 summit 1
能够看到,Wordcount应用程序的输出其实是一个连续的更新流,其中每一个输出记录(即上面原始输出中的每一行)是单个单词的更新计数,也就是诸如“kafka”的记录关键字。 对于具备相同密钥的多个记录,每一个后面的记录都是前一个记录的更新。
下面的两张图说明了幕后发生的事情。第一列显示KTable <string,long>当前状态的演变,它计数count的单词出现次数。 第二列显示从KTable的状态更新以及发送到输出主题streams-wordcount-output的更改记录。
首先正在处理文本行“all streams lead to kafka”。KTable正在创建,由于每一个新单词都会生成一个新表格(用绿色背景突出显示),并将相应的更改记录发送到下游KStream。
当处理第二行文本“hello kafka streams”时,咱们首次观察到KTable中现有的条目正在被更新(这里是:“kafka”和“streams”)。 再次,更改记录发送到输出主题。
(咱们跳过了第三行如何处理的说明)。这解释了为何输出主题具备咱们上面显示的内容,由于它包含完整的变动记录。
在这个例子的范围以外,Kafka Streams在这里作的是利用表和变动日志流之间的对偶性(这里:table = KTable,changelog stream =下游KStream):你能够发布table转换为流,而且若是你从头至尾使用整个变动日志流,则能够从新构建表的内容。
最后,经过Ctrl-C
中止控制台消费者,生产者,Wordcount程序,Kafka Broker和Zokeeper服务。
本文转发自 http://orchome.com/936
关于Kafka深刻学习视频, 如Kafka领导选举, offset管理, Streams接口, 高性能之道, 监控运维, 性能测试等,
请关注我的微信公众号: 求学之旅, 发送Kafka, 便可收获Kafka学习视频大礼包一枚。