欢迎关注公众号:n平方
若有问题或建议,请后台留言,我会尽力解决你的问题。
本文主要介绍【Kafka Streams的架构和使用】java
Kafka流经过构建Kafka生产者和消费者库,并利用Kafka的本地功能来提供数据并行性、分布式协调、容错和操做简单性,从而简化了应用程序开发。
下图展现了一个使用Kafka Streams库的应用程序的结构。apache
Kafka的消息传递层对数据进行分区,以存储和传输数据。Kafka流划分数据进行处理。在这两种状况下,这种分区都支持数据局部性、灵活性、可伸缩性、高性能和容错性。Kafka流使用分区和任务的概念做为基于Kafka主题分区的并行模型的逻辑单元。Kafka流与Kafka在并行性上下文中有着紧密的联系:编程
应用程序的处理器拓扑经过将其分解为多个任务进行扩展。
更具体地说,Kafka流基于应用程序的输入流分区建立固定数量的任务,每一个任务分配一个来自输入流的分区列表(例如,kafka的topic)。分配给任务的分区永远不会改变,所以每一个任务都是应用程序并行性的固定单元。
而后,任务能够基于分配的分区实例化本身的处理器拓扑;它们还为每一个分配的分区维护一个缓冲区,并从这些记录缓冲区一次处理一条消息。
所以,流任务能够独立并行地处理,而无需人工干预。api
理解Kafka流不是一个资源管理器,而是一个“运行”其流处理应用程序运行的任何地方的库。应用程序的多个实例要么在同一台机器上执行,要么分布在多台机器上,库能够自动将任务分配给运行应用程序实例的那些实例。分配给任务的分区从未改变;若是应用程序实例失败,它分配的全部任务将在其余实例上自动从新启动,并继续从相同的流分区使用。安全
下图显示了两个任务,每一个任务分配一个输入流分区。架构
Kafka流容许用户配置库用于在应用程序实例中并行处理的线程数。每一个线程能够独立地使用其处理器拓扑执行一个或多个任务。
例如,下图显示了一个流线程运行两个流任务。分布式
启动更多的流线程或应用程序实例仅仅至关于复制拓扑并让它处理Kafka分区的不一样子集,从而有效地并行处理。值得注意的是,线程之间不存在共享状态,所以不须要线程间的协调。这使得跨应用程序实例和线程并行运行拓扑变得很是简单。Kafka主题分区在各类流线程之间的分配是由Kafka流利用Kafka的协调功能透明地处理的。ide
如上所述,使用Kafka流扩展您的流处理应用程序很容易:您只须要启动应用程序的其余实例,Kafka流负责在应用程序实例中运行的任务之间分配分区。您能够启动与输入Kafka主题分区同样多的应用程序线程,以便在应用程序的全部运行实例中,每一个线程(或者更确切地说,它运行的任务)至少有一个输入分区要处理。性能
Kafka流提供了所谓的状态存储,流处理应用程序可使用它来存储和查询数据,这是实现有状态操做时的一项重要功能。例如,Kafka Streams DSL在调用有状态操做符(如join()或aggregate())或打开流窗口时自动建立和管理这样的状态存储。ui
Kafka Streams应用程序中的每一个流任务均可以嵌入一个或多个本地状态存储,这些存储能够经过api访问,以存储和查询处理所需的数据。Kafka流为这种本地状态存储提供容错和自动恢复功能。
下图显示了两个流任务及其专用的本地状态存储。
Kafka流构建于Kafka中本地集成的容错功能之上。Kafka分区是高度可用和复制的;所以,当流数据持久化到Kafka时,即便应用程序失败并须要从新处理它,流数据也是可用的。Kafka流中的任务利用Kafka消费者客户端提供的容错功能来处理失败。若是任务在失败的机器上运行,Kafka流将自动在应用程序的一个剩余运行实例中从新启动该任务。
此外,Kafka流还确保本地状态存储对于故障也是健壮的。对于每一个状态存储,它维护一个复制的changelog Kafka主题,其中跟踪任何状态更新。这些变动日志主题也被分区,这样每一个本地状态存储实例,以及访问该存储的任务,都有本身专用的变动日志主题分区。在changelog主题上启用了日志压缩,这样能够安全地清除旧数据,防止主题无限增加。若是任务在一台失败的机器上运行,并在另外一台机器上从新启动,Kafka流经过在恢复对新启动的任务的处理以前重播相应的更改日志主题,确保在失败以前将其关联的状态存储恢复到内容。所以,故障处理对最终用户是彻底透明的。
就是控制台输入到kafka中,通过处理输出。
package com.example.kafkastreams.demo; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class PipeDemo { public static void main(String[] args) { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); builder.stream("streams-plaintext-input").to("streams-pipe-output"); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
就是将你输入的字符串进行分词输出。
package com.example.kafkastreams.demo; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class LineSplitDemo { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("streams-plaintext-input"); source.flatMapValues(value -> Arrays.asList(value.split("\\W+"))) .to("streams-linesplit-output"); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
将你输入的字符串进行按单词统计输出。
package com.example.kafkastreams.demo; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.state.KeyValueStore; import java.util.Arrays; import java.util.Locale; import java.util.Properties; import java.util.concurrent.CountDownLatch; public class WordCountDemo { public static void main(String[] args) throws Exception { Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); final StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> source = builder.stream("streams-plaintext-input"); source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))) .groupBy((key, value) -> value) .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")) .toStream() .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long())); final Topology topology = builder.build(); final KafkaStreams streams = new KafkaStreams(topology, props); final CountDownLatch latch = new CountDownLatch(1); // attach shutdown handler to catch control-c Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") { @Override public void run() { streams.close(); latch.countDown(); } }); try { streams.start(); latch.await(); } catch (Throwable e) { System.exit(1); } System.exit(0); } }
本人水平有限,欢迎各位建议以及指正。顺便关注一下公众号呗,会常常更新文章的哦。