KTable:java
KGroupedStream / KGroupedTable:apache
KStreams:app
KGroupedStream comes after groupBy() or groupByKey() call on KStreamide
Count counts the number of record by grouped key函数
If used on KGroupedStream:this
If used on KGroupedTable:spa
经过groupByKey
或groupBy
分组后,返回KGroupedStream
或KGroupedTable
数据类型,它们能够进行聚合的操做。聚合是基于key操做的。这里有个注意点,kafka streams要求同一个链接操做所涉及的topic必需要有相同数量的分区,而且链接所用的key必须就是分区的key,至于为何能够想想分库分表后的join问题。.net
滚动聚合,按分组键进行聚合。翻译
聚合分组流时,必须提供初始值设定项(例如,aggValue = 0)和“加法”聚合器(例如,aggValue + curValue)。debug
聚合分组表时,必须提供“减法”聚合器(例如:aggValue - oldValue)。
KGroupedStream<byte[], String> groupedStream = ; KGroupedTable<byte[], String> groupedTable = ; // 聚合分组流 (注意值类型如何从String更改成Long) KTable<byte[], Long> aggregatedStream = groupedStream.aggregate( () -> 0L, // 初始值 (aggKey, newValue, aggValue) -> aggValue + newValue.length(), Materialized.as("aggregated-stream-store") // 本地状态名称 .withValueSerde(Serdes.Long()); // 聚合分组表 KTable<byte[], Long> aggregatedTable = groupedTable.aggregate( () -> 0L, (aggKey, newValue, aggValue) -> aggValue + newValue.length(), (aggKey, oldValue, aggValue) -> aggValue - oldValue.length(), Materialized.as("aggregated-table-store") .withValueSerde(Serdes.Long())
KGroupedStream:
key为null的记录会被忽略。
第一次收到记录key时,将调用初始化(并在加法器以前调用)。
只要记录的值为非null时,就会调用加法器。
KGroupedTable:
下面是关于 KGroupedTable 中 aggregate 函数的介绍, 我会晚一些把它详细的翻译成中文,由于这个函数若是用好功能是十分强大,同时也是很难掌握的。
<VR> KTable<K,VR> aggregate(Initializer<VR> initializer, Aggregator<? super K,? super V,VR> adder, Aggregator<? super K,? super V,VR> subtractor, Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized) Aggregate the value of records of the original KTable that got mapped to the same key into a new instance of KTable. Records with null key are ignored. Aggregating is a generalization of combining via reduce(...) as it, for example, allows the result to have a different type than the input values. The result is written into a local KeyValueStore (which is basically an ever-updating materialized view) that can be queried using the provided queryableStoreName. Furthermore, updates to the store are sent downstream into a KTable changelog stream. The specified Initializer is applied once directly before the first input record is processed to provide an initial intermediate aggregation result that is used to process the first record. Each update to the original KTable results in a two step update of the result KTable. The specified adder is applied for each update record and computes a new aggregate using the current aggregate (or for the very first record using the intermediate aggregation result provided via the Initializer) and the record's value by adding the new record to the aggregate. The specified subtractor is applied for each "replaced" record of the original KTable and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced" record from the aggregate. Thus, aggregate(Initializer, Aggregator, Aggregator, Materialized) can be used to compute aggregate functions like sum. For sum, the initializer, adder, and subtractor would work as follows: // in this example, LongSerde.class must be set as value serde in Materialized#withValueSerde public class SumInitializer implements Initializer<Long> { public Long apply() { return 0L; } } public class SumAdder implements Aggregator<String, Integer, Long> { public Long apply(String key, Integer newValue, Long aggregate) { return aggregate + newValue; } } public class SumSubtractor implements Aggregator<String, Integer, Long> { public Long apply(String key, Integer oldValue, Long aggregate) { return aggregate - oldValue; } } Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to the same key. The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of parallel running Kafka Streams instances, and the configuration parameters for cache size, and commit intervall. To query the local KeyValueStore it must be obtained via KafkaStreams#store(...): KafkaStreams streams = ... // counting words ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore()); String key = "some-word"; Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances) For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata() to query the value of the key on a parallel running instance of your Kafka Streams application. For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "storeName" is the provide store name defined in Materialized, and "-changelog" is a fixed suffix. You can retrieve all generated internal topic names via Topology.describe(). Type Parameters: VR - the value type of the aggregated KTable Parameters: initializer - an Initializer that provides an initial aggregate result value adder - an Aggregator that adds a new record to the aggregate result subtractor - an Aggregator that removed an old record from the aggregate result materialized - the instance of Materialized used to materialize the state store. Cannot be null Returns: a KTable that contains "update" records with unmodified keys, and values that represent the latest (rolling) aggregate for each key
For KGroupedStream aggregate() you need an initializer, an adder, a Serde and a StateStore name (name of your aggregation)
For KGroupedTable aggregate() you need an initializer, an adder, a substractor, a Serde and a StateStore name (name of your aggregation)
substractor is used for keep current state of data, if key1, value1 is updated to key1, value2 then by count we will got key1, (count of value1 - 1)
Why do we need substractor in KGroupedTable, but not in KGroupedStream? Because in Stream we only will perform insert operation, which for aggregate() could be handled with add. In KGroupedTable we perform not only insert, but also update/delete, that's why we need a substractor for it.
Reduce is a simplified version of aggregate(), but the result type has to be the same as an input:
(int, int) => int or (String, String) => String (example concat(a,b))
Peek allows you to apply a side-effect operation to a KStream and get the same KStream as a result
For example:
Warning: it could be executed multiple times (cause of at least once concept) as it is side effect (in case of failures)
Like for debugging on the console:
Trust me, you won't need them !!!
This diagram explains, how different Kafka object interact and transform:
发现一篇不错的文章:
https://my.oschina.net/u/2424727/blog/2989115