数据流(也叫事件流)是无边界数据集的抽象表示。无边界意味着无限和持续增加。无边界数据集之因此是无限的,是由于随着时间的推移,新记录会不断加入进来。数据流除了无边界还有如下特性:html
数据流是有序的。事件的发生老是有前后顺序的,如先下单再发货java
数据记录不可变。事件一旦发生,就不能被改变,以下单后,想要取消只会新产生一个事件git
数据流是可重播的。github
持续的从一个无边界的数据集读取数据,而后对它们进行处理并生成结果,就是流式处理。spring
流式处理是一种编程范式,就像请求和响应范式、批处理范式那样,下面是三种范式的比较:sql
请求和响应,这种范式的特色是延迟最小数据库
批处理,这种范式的特色是高延迟和高吞吐量apache
流式处理,这种范式介于上面两种之间编程
Kafka Streams是一个用于构建流式处理应用程序的客户端库,其中输入和输出数据是存储在Kafka集群中的。一个用Kakfa Streams搭建的流处理程序,它的架构以下图:bootstrap
Kafka Streams的优势:
简单轻量级的客户端库,能够轻松嵌入到任何Java应用程序中;除了Kafka以外没有其余外部的依赖。全部能够轻松的整合到本身的应用中,也不须要为流式处理需求额外的的部署一个应用集群。
使用Kafka做为内部消息通信存储介质,不须要从新加入其它外部组件来作消息通信。值得注意的是,它使用Kafka的分区模型来水平扩展处理,同时保持强大的排序保证。
支持本地状态容错,可实现很是快速有效的有状态操做,如窗口链接和聚合。本地状态被保存在Kafka中,在机器故障的时候,其余机器能够自动恢复这些状态继续处理。
能够保证每一个记录只处理一次,即便在处理过程当中Streams客户端或Kafka代理发生故障时也只处理一次。
采用一条记录一次处理以实现毫秒处理延迟,并支持基于事件时间的窗口操做以及记录的延迟到达。
提供丰富的流式处理API,包括高级的Streams DSL和低级的Processor API。
时间在流式处理中很是的重要,由于大部分的流式处理都是基于时间窗的,如计算5分钟内用户的访问量,那么对于五分钟前的数据就不该该参与计算。
事件时间
事件时间是指事件的发生时间或事件的建立时间,如商品的出售时间,用户的访问时间。在Kakfa 0.10以后的版本,生产者会自动在记录中添加建立时间,若是与业务的事件时间不一致,那就须要手动设置这个时间。
日志追加时间
日志追加时间是指时间保存到kafka broker上的时间。
处理时间
处理时间是指咱们的应用在收到事件后对其处理的时间。同一个事件的处理时间可能不一样,这取决于不一样的应用什么时候读取这个时间
若是只是单独的处理每个事件,那么这个流式处理就很简单,如从数据流中过滤出交易金额大于10000的数据,而后给这些交易人发个优惠券,这种需求咱们用kafka的消费者客户端就彻底能知足,但一般状况下咱们的操做中会包含多个事件,如统计总数、平均数、最大值等。事件与事件之间的信息被称为状态。如咱们卖了1双鞋,而后又买了2双鞋,通过这两个事件后,在如今这个时间,咱们鞋数量的状态就是一双。
在业务系统中,有时咱们关注变化的过程,有时咱们关注结果。流是一系列事件,每一个事件就是一个变动;表包含了当前的状态,是多个变动所产生的结果。将流转换为表,叫作流的物化。咱们捕获到表所发生的变动(insert、update、delete)事件,这些事件就组成了流。
在流处理中,咱们的数据处理大部分都是基于窗操做的,如咱们在分析股价的走势时,咱们须要统计出的天天或者每一个小时的内股价的平均价格,而后查看价格的一个走势,而不是直接统计从股票发行到如今的平均价,这个是没多大意义的,这里的天天或每一个小时就是一个时间窗。
在Kafka streams中窗口有两种,时间窗口和会话窗口(其实会话窗口也是基于时间窗的)。
时间窗有两个重要的属性:窗口大小和步长(移动间隔)。
滚动窗口:步长等于窗口大小,滚动窗口是没有没有记录的重叠。
跳跃窗口:步长不等于窗口大小
滑动窗口:窗口随着每一条记录移动,滑动窗口不与时间对齐,而是与数据记录时间戳对齐。
会话窗口:会话窗口与时间窗最大的不一样是,他的大小是不肯定的(由于它的大小是由数据自己决定的)
看下图,这是一个时间间隔为5分钟的session窗口 ,先忽略图中迟到的两个记录,假设他们没迟到。
在kakfa streams中,计算逻辑被定义为拓扑,它是一个操做和变换的集合,每一个事件从输入到输出都会流经它。每一个流式处理的应用至少会有一个拓扑。
流处理器是处理拓扑中的各个节点,表明拓扑中的每一个处理步骤,用来完成数据转换功能,如过滤、映射、分组、聚合等。一个流处理器同一时间从上游接收一条输入数据,产生一个或多个输出记录到下个流式处理器。
一个拓扑中有两种特殊的的处理器:
Source Processor,没有上游处理器,从一个或多个Kafka topic为拓扑生成输入流。
Slink Processor,没有下游处理器,将从上游处理器接收的记录发送到指定的Kafka主题。
将数据按空格拆分红单个单词,过滤掉不须要的单词,统计每一个单词出现的次数
// 配置信息 Properties props = new Properties(); //Streams应用Id props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordCount"); //Kafka集群地址 props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.195.88:9092"); //指定序列化和反序列化类型 props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); //建立一个topology构建器,在kakfa中计算逻辑被定义为链接的处理器节点的拓扑。 StreamsBuilder builder = new StreamsBuilder(); //使用topology构建器建立一个源流,指定源topic KStream<String, String> source = builder.stream("wordCountInput"); // 构建topology KStream<String, Long> wordCounts = source //把数据按空格拆分红单个单词 .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+"))) //过滤掉the这个单词,不统计这个单词 .filter((key, value) -> (!value.equals("the"))) //分组 .groupBy((key, word) -> word) //计数,其中'countsStore'是状态存储的名字 .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("countsStore")) .toStream(); //将stream写回到Kafka的topic wordCounts.to("wordCountOutput", Produced.with(Serdes.String(), Serdes.String())); //建立Streams客户端 KafkaStreams streams = new KafkaStreams(builder.build(), props); //启动Streams客户端 streams.start();
统计5秒钟内股票交易的最高价、最低价、交易次数及平均价格,统计信息每隔一秒钟更新一次。这是一个典型的时间窗的应用。
StreamsBuilder builder = new StreamsBuilder(); KStream<String, Trade> source = builder.stream("stockStatsInput"); KStream<Windowed<String>, TradeStats> stats = // 按key分组,这里的key是股票代码 source.groupByKey() // 建立一个跳跃时间窗,窗口大小5s,步长1s .windowedBy(TimeWindows.of(Duration.ofSeconds(5)).advanceBy(Duration.ofSeconds(1))) // 进行聚合操做,用TradeStats对象存储每一个窗口的统计信息——最高价、最低价、交易次数及总交易额 .aggregate(TradeStats :: new, (k, v, tradeStats) -> tradeStats.add(v), Materialized.<String, TradeStats, WindowStore<Bytes, byte[]>>as("tradeAggregates").withValueSerde(new TradeStatsSerde())) .toStream() //计算平均股价 .mapValues(TradeStats :: computeAvgPrice); //将stream写回到Kafka stats.to("stockStatsOutput", Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start();
根据用户的搜索信息、点击信息、及我的信息,把这些信息链接在一块儿,动态的分析出用户行为。如一个用户搜索了"奶粉",并在一分钟内点击了“贝因美”,我的信息是年龄20-30岁之间的女性,咱们把这些事件流链接起来后,就能够获得一条用户分析的数据,之后贝因美的奶粉搞活动了就能够直接向该用户推荐。这个例子主要演示了数据流间的链接。
// 搜索事件 KStream<Integer, Search> searches =builder.stream(Constants.SEARCH_TOPIC, Consumed.with(Serdes.Integer(), new SearchSerde())); // 点击事件 KStream<Integer, PageView> views = builder.stream(Constants.PAGE_VIEW_TOPIC, Consumed.with(Serdes.Integer(), new PageViewSerde())); // 用户信息 KTable<Integer, UserProfile> profiles = builder.table(Constants.USER_PROFILE_TOPIC, Consumed.with(Serdes.Integer(), new ProfileSerde()), Materialized.as("profileStore")); //将点击事件与用户信息链接,用UserActivity对象来存储状态 KStream<Integer, UserActivity> viewsWithProfile = views.leftJoin(profiles, (page, profile) -> { if (profile != null){ return new UserActivity(profile.getUserId(), profile.getUserName(), profile.getPhone(), "", page.getPage()); }else { return new UserActivity(-1, "", "", "", page.getPage()); } }); //将用户点击事件与搜索信息链接 KStream<Integer, UserActivity> userActivityKStream = viewsWithProfile.leftJoin(searches, (userActivity, search) -> { if (search != null) { userActivity.updateSearch(search.getSearchTerms()); }else { userActivity.updateSearch(""); } return userActivity; }, // 搜索事后的10s秒钟内的数据才被认为是相关联的 JoinWindows.of(Duration.ofSeconds(10)), Joined.with(Serdes.Integer(), new UserActivitySerde(), new SearchSerde())); userActivityKStream.to(Constants.USER_ACTIVITY_TOPIC, Produced.with(Serdes.Integer(), new UserActivitySerde()) ); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start();
Kafka Streams的API有两种,Kafka Streams DSL和Processor API。
Kafka Streams DSL是高级API,它提供最多见的数据转换操做,诸如map,filter,join等。
Processor API一种低级API,容许您添加和链接处理器以及直接与状态存储进行交互,Processor API为您提供比DSL更多的灵活性,但代价是须要在应用程序开发人员方面进行更多的手动工做(例如,更多行代码)。所以接下来全部的工做都是基于DSL。
KStream:数据流抽象。建立方法以下:
StreamsBuilder builder = new StreamsBuilder(); KStream<String, Long> wordCounts = builder.stream( "word-counts-input-topic", // 输入的topic Consumed.with(Serdes.String(), Serdes.Long()) //key和value的序列化方式 );
KTable:数据表抽象。建立方法以下:
KStream<String, Long> wordCounts = builder.table( "word-counts-input-topic", // 输入的topic Consumed.with(Serdes.String(), Serdes.Long(), //key和value的序列化方式 Materialized.as("word-counts-store") // 状态存储名 );
GlobalKTable:同KTable,只不过是全局的,KTable是读取的当前分区的数据,而GlobalKTable是读取的所有分区的数据,这在进行join操做时是很是有用的。比较相似关系型数据库在分库分表后join的问题。
如何理解kStream和KTable的区别:
咱们能够这样看,咱们从topic前后读取了两条数据,("苹果", 1) --> ("苹果", 3)
,对于KStream来讲,表示有一个苹果,而后我又有3个苹果,结果是我就有了4个苹果 ;可是对于KTable来讲,表示我如今有1苹果,我如今有3个苹果,结果是我有3个苹果。由于对于KTable来讲,第二条记录是第一条记录的更新。
因此官网对它们区别描述的很是好,KStream对于流中的记录始终解释为insert,而KTable对流中的记录解释为upsert。
只须要数据流过一遍就能够,不依赖先后的状态。
branch:将一个Kstream分红多个
KStream<String, Long>[] branches = stream.branch( (key, value) -> key.startsWith("A"), //branches[0]中只包含key以“A”开头的全部记录 (key, value) -> key.startsWith("B"), //branches[1]中只包含key以“B”开头的全部记录 (key, value) -> true //branches[2]中包含其余记录 );
filter:过滤操做
// 过滤掉value不大于0的记录 KStream<String, Long> onlyPositives = stream.filter((key, value) -> value > 0);
filterNot:反向过滤,与filter相反
flatMap:将一条记录转换成0条、1条或多条记录
// 把一条记录转换成了两条记录。如: (345L, "Hello") -> ("HELLO", 1000), ("hello", 9000) KStream<String, Integer> transformed = stream.flatMap((key, value) -> { List<KeyValue<String, Integer>> result = new LinkedList<>(); result.add(KeyValue.pair(value.toUpperCase(), 1000)); result.add(KeyValue.pair(value.toLowerCase(), 9000)); return result; });
flatMapValues:做用和flatMap相同,可是只是对value操做,转换后记录的key同原来的key
// 经过空格拆分红单个单词 KStream<byte[], String> words = sentences.flatMapValues(value -> Arrays.asList(value.split("\\s+")));
foreach:循环
// 循环打印出每条记录 stream.foreach((key, value) -> System.out.println(key + " => " + value));
groupByKey:根据key分组
KGroupedStream<byte[], String> groupedStream = stream.groupByKey();
GroupBy: 分组
// 分组,并修改了key和value的类型 KGroupedStream<String, String> groupedStream = stream.groupBy( (key, value) -> value, Serialized.with(Serdes.String(), Serdes.String()) ); // 分组,并生成新的key,而且修改了key和value的类型 KGroupedTable<String, Integer> groupedTable = table.groupBy( (key, value) -> KeyValue.pair(value, value.length()), Serialized.with(Serdes.String(), Serdes.Integer()) );
map:将一条记录转换成另外一条记录
KStream<String, Integer> transformed = stream.map(key, value) -> KeyValue.pair(value.toLowerCase(), value.length()));
mapValues:做用同map,可是只是对value操做,转换后记录的key同原来的key
KStream<byte[], String> uppercased = stream.mapValues(value -> value.toUpperCase());
merge:合并两个流
KStream<byte[], String> merged = stream1.merge(stream2);
peek:对每条记录执行无状态操做,并返回未更改的流,也就是说peek中的任何操做,返回的都是之前的流,能够用来调试
KStream<byte[], String> unmodifiedStream = stream.peek((key, value) -> System.out.println("key=" + key + ", value=" + value));
print:打印流,能够用来调试
stream.print();
SelectKey:从新构建key
//将key值改成value的第一个单词 KStream<String, String> rekeyed = stream.selectKey((key, value) -> value.split(" ")[0])
toStream:将KTable转换成KStream
KStream<byte[], String> stream = table.toStream();
有状态的转换包括:Aggregating、Joining、Windowing。他们间的关系以下图:
经过groupByKey
或groupBy
分组后,返回KGroupedStream
或KGroupedTable
数据类型,它们能够进行聚合的操做。聚合是基于key操做的。这里有个注意点,kafka streams要求同一个链接操做所涉及的topic必需要有相同数量的分区,而且链接所用的key必须就是分区的key,至于为何能够想想分库分表后的join问题。
aggregate
滚动聚合,按分组键进行聚合。
聚合分组流时,必须提供初始值设定项(例如,aggValue = 0)和“加法”聚合器(例如,aggValue + curValue)。
聚合分组表时,必须提供“减法”聚合器(例如:aggValue - oldValue)。
KGroupedStream<byte[], String> groupedStream = ; KGroupedTable<byte[], String> groupedTable = ; // 聚合分组流 (注意值类型如何从String更改成Long) KTable<byte[], Long> aggregatedStream = groupedStream.aggregate( () -> 0L, // 初始值 (aggKey, newValue, aggValue) -> aggValue + newValue.length(), Materialized.as("aggregated-stream-store") // 本地状态名称 .withValueSerde(Serdes.Long()); // 聚合分组表 KTable<byte[], Long> aggregatedTable = groupedTable.aggregate( () -> 0L, (aggKey, newValue, aggValue) -> aggValue + newValue.length(), (aggKey, oldValue, aggValue) -> aggValue - oldValue.length(), Materialized.as("aggregated-table-store") .withValueSerde(Serdes.Long())
KGroupedStream:
key为null的记录会被忽略。
第一次收到记录key时,将调用初始化(并在加法器以前调用)。
只要记录的值为非null时,就会调用加法器。
KGroupedTable:
aggregate (windowed)
窗口聚合,按分组键聚合每一个窗口的记录值。
KGroupedStream<String, Long> groupedStream = ...; // 与基于时间的窗口进行聚合(此处:使用5分钟的翻滚窗口) KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.windowedBy(Duration.ofMinutes(5)) .aggregate( () -> 0L, (aggKey, newValue, aggValue) -> aggValue + newValue, Materialized.<String, Long, WindowStore<Bytes, byte[]>>as("time-windowed-aggregated-stream-store").withValueSerde(Serdes.Long())); // 使用基于会话的窗口进行聚合(此处:不活动间隔为5分钟) KTable<Windowed<String>, Long> sessionizedAggregatedStream = groupedStream.windowedBy(SessionWindows.with(Duration.ofMinutes(5)) .aggregate( () -> 0L, (aggKey, newValue, aggValue) -> aggValue + newValue, (aggKey, leftAggValue, rightAggValue) -> leftAggValue + rightAggValue, Materialized.<String, Long, SessionStore<Bytes, byte[]>>as("sessionized-aggregated-stream-store").withValueSerde(Serdes.Long()));
count
滚动聚合,按分组键统计记录数。
// Counting a KGroupedStream KTable<String, Long> aggregatedStream = groupedStream.count(); // Counting a KGroupedTable KTable<String, Long> aggregatedTable = groupedTable.count();
对于KGroupedStream,会忽略具备空键或空值的记录。
对于KGroupedTable,会忽略具备空键的输入记录,具备空值的记录,会从table中删除该键。
count(windowed)
窗口聚合。按分组键统计每一个窗口的记录数,它会忽略具备空键或空值的记录。
KTable<Windowed<String>, Long> aggregatedStream = groupedStream.windowedBy( TimeWindows.of(Duration.ofMinutes(5))) // 基于时间的窗口 .count(); KTable<Windowed<String>, Long> aggregatedStream = groupedStream.windowedBy( SessionWindows.with(Duration.ofMinutes(5))) // session窗口 .count();
Reduce
滚动聚合,经过分组键组合(非窗口)记录的值。当前记录值与最后一个减小的值组合,并返回一个新的减小值。与聚合不一样,结果值类型不能更改。
KGroupedStream<String, Long> groupedStream = ...; KGroupedTable<String, Long> groupedTable = ...; KTable<String, Long> aggregatedStream = groupedStream.reduce( (aggValue, newValue) -> aggValue + newValue ); KTable<String, Long> aggregatedTable = groupedTable.reduce( (aggValue, newValue) -> aggValue + newValue, (aggValue, oldValue) -> aggValue - oldValue );
Reduce (windowed)
窗口聚合。经过分组键将每一个窗口的记录值组合在一块儿。当前记录值与最后一个减小的值组合,并返回一个新的减小值。使用null键或值的记录将被忽略。与聚合不一样,结果值类型不能更改。
KGroupedStream<String, Long> groupedStream = ...; KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream.windowedBy( TimeWindows.of(Duration.ofMinutes(5))) .reduce((aggValue, newValue) -> aggValue + newValue ); KTable<Windowed<String>, Long> sessionzedAggregatedStream = groupedStream.windowedBy( SessionWindows.with(Duration.ofMinutes(5))) .reduce((aggValue, newValue) -> aggValue + newValue );
分组流的聚合:
KStream<String, Integer> wordCounts = ...; KGroupedStream<String, Integer> groupedStream = wordCounts .groupByKey(Grouped.with(Serdes.String(), Serdes.Integer())); KTable<String, Integer> aggregated = groupedStream.aggregate( () -> 0, (aggKey, newValue, aggValue) -> aggValue + newValue, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>as("aggregated-stream-store" ) .withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer());
这段段代码运行后,聚合会以下图所示随着时间的变化:
分组表的聚合:
KTable<String, String> userProfiles = ...; KGroupedTable<String, Integer> groupedTable = userProfiles .groupBy((user, region) ->KeyValue.pair(region, user.length()), Serdes.String(), Serdes.Integer()); KTable<String, Integer> aggregated = groupedTable.aggregate( () -> 0, (aggKey, newValue, aggValue) -> aggValue + newValue, (aggKey, oldValue, aggValue) -> aggValue - oldValue, Materialized.<String, Long, KeyValueStore<Bytes, byte[]>as("aggregated-table-store" ) .withKeySerde(Serdes.String()).withValueSerde(Serdes.Integer());
这段段代码运行后,聚合会以下图所示随着时间的变化:
所谓链接,就是将两条记录按照必定的规则链接为一条记录,其实和sql中的链接是同样的做用。在Kafka stream中,join都是基于Key的,join的方式有三种:innerJoin、leftJoin和outerJoin。
join
KStream<String, Long> left = ...; KStream<String, Double> right = ...; KStream<String, String> joined = left.join(right, (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, JoinWindows.of(Duration.ofMinutes(5)), Joined.with(Serdes.String(), Serdes.Long(), Serdes.Double()) );
leftJoin
KStream<String, Long> left = ...; KStream<String, Double> right = ...; KStream<String, String> joined = left.leftJoin(right, (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, JoinWindows.of(Duration.ofMinutes(5)), Joined.with(Serdes.String(), Serdes.Long(), Serdes.Double()) );
outerJoin
KStream<String, Long> left = ...; KStream<String, Double> right = ...; KStream<String, String> joined = left.outerJoin(right, (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */ JoinWindows.of(Duration.ofMinutes(5)), Joined.with( Serdes.String(), Serdes.Long(), Serdes.Double()) );
KStream-KStream的join老是基于windowed的。空键或空值的输入记录将被忽略,而且不会触发链接
KTable-KTable的join都是不基于windowed的。空键的输入记录将被忽略,而且不会触发链接
KStream-KTable的join都是不基于windowed的。只有左侧(流)的输入记录才会触发链接。右侧(表)的输入记录仅更新内部右侧链接状态。空键或空值的输入记录将被忽略,而且不会触发链接。
KStream-GlobalKTable的join都是不基于windowed的。只有左侧(流)的输入记录才会触发链接。右侧(表)的输入记录仅更新内部右侧链接状态。空键或空值的输入记录将被忽略,而且不会触发链接。
为何只有流与流的链接必须是基于窗口的呢?由于流的数据是无限的,因此流和流的链接是不能完成的。
窗口化使您能够控制如何将具备相同键的记录分组,以进行有状态操做,例如聚合或链接到所谓的窗口。根据记录密钥跟踪Windows。
// 建立一个时间窗口:窗口大小5s,步长1s TimeWindows.of(Duration.ofSeconds(5)).advanceBy(Duration.ofSeconds(1)); // 建立一个会话窗口:窗口大小5分钟 SessionWindows.with(Duration.ofMinutes(5));
在实际状况中,咱们不能保证,每条记录都能准时的到达,因此就不能保证窗口的结果必定是正确的。例如,咱们要统计每个小时的每种产品的的销售量,而后筛选出销量大于3的产品,可是有一条销售记录的确是在这个小时内产生的,因为某种缘由,在这个时间窗关闭的之后才到达,这样的话咱们这个时间窗的统计数据其实是不许确的,解决这个问题,能够用以下的方法:那就是,时间窗在到时间时,先不着急关闭,等待一段时间。
KGroupedStream<UserId, Event> grouped = ...; //容许时间窗接受迟到10分钟内的记录 grouped.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10))) .count() //控制在窗口关闭前,下游接受不到任何记录 .suppress(Suppressed.untilWindowCloses(unbounded())) .filter((windowedUserId, count) -> count < 3) .toStream();
能够继承Process和Transform来达到咱们自定义的目的。
参数名称 | 默认值 | 描述 |
---|---|---|
application.id | 无 | kafka集群地址,必须参数 |
bootstrap.servers | 无 | 应用id,必须参数,同一应用的全部实例的都应该一直。 |
commit.interval.ms | 30000 ms | 提交任务位置的频率 |
replication.factor | 1 | 应用程序建立的更改日志主题和从新分区主题的复制因子 |
state.dir | /tmp/kafka-streams | 状态存储的物理位置,注意这个是保存的本地的 |
还有不少配置,等之后用到了再慢慢更新。详细请参考官方的配置介绍
既然咱们选择了kafka作应用,那么只用单线程或单实例的处理咱们的业务那基本上是不太可能的,若是已经使用过kafka,咱们知道kafka的扩展能力那是很是出色的,对使用者也是很是的简单,如kafka集群自身的扩展,咱们仅仅须要集群的配置文件复制到新节点中,修改一下broker id就好了。对于Kafka Streams的应用来讲,经过启动多个实例组建集群来提升吞掉量,那也是很是的容易,由于kafka会自动帮咱们作好这些事情。
kafka能自动的根据咱们的实例数量和每一个实例的线程数量,将任务进行拆分,固然和topic的分区数也是直接相关的。和咱们的消费者客户端同样,kafka会自动的协调工做,为每一个任务分配属于任务本身的分区,这样每一个任务独自处理本身的分区,并维护与聚合相关的本地状态。
若是咱们须要处理来自多个分区的结果,即对多个任务结果再进行处理,这时咱们就能够根据新的key进行从新分区后写入到重分区主题上,并启动新的任务重新主题上读取和处理事件。
Kafka Streams对故障的处理有很是好的支持,若是应用出现故障须要重启,能够自动的从Kafka上找到上一处理的位置,从该位置继续开始处理。若是本地状态丢失(如宕机),应用能够自动从保存到kafka上的变动日志新建本地状态,由于本地状态的全部数据都保存到了kafka中。若是集群中的一个任务失败,只要还有其余任务实例可用,就能够用其余实例来继续这个任务,由于Kafka有消费者的重平衡机制。
依赖:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId> </dependency>
流处理类:
@EnableBinding(KafkaStreamsProcessor.class) public class WordCountProcessor { @StreamListener("input") @SendTo("output") public KStream<?, WordCountDto> process(KStream<Object, String> input) { return input .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+"))) .map((key, value) -> new KeyValue<>(value, value)) .groupByKey(Serialized.with(Serdes.String(), Serdes.String())) .windowedBy(TimeWindows.of(30000)) .count(Materialized.as("WordCounts-1")) .toStream() .map((key, value) -> new KeyValue<>(null, new WordCountDto(key.key(), value, new Date(key.window().start()), new Date(key.window().end())))); } }
配置文件:
spring.cloud.stream.kafka.streams.binder: brokers: 192.168.195.88 applicationId: word-count configuration: default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde commit.interval.ms: 1000 serdeError: logAndFail spring.cloud.stream.bindings.output: destination: wordCountOutput spring.cloud.stream.bindings.input: destination: wordCountInput
参考: