4 Kafka Streams -> Stateful Operations of KStreams and KTables without Joins and Window

KTable:java

  • groupBy

KGroupedStream / KGroupedTable:apache

  • Count
  • Reduce
  • Aggregate

KStreams:app

  • Peek
  • Transform / TransformValues

KTable GroupBy()

KGroupedStream / KGroupedTable Count()

KGroupedStream comes after groupBy() or groupByKey() call on KStreamide

Count counts the number of record by grouped key函数

If used on KGroupedStream:this

  • Null keys or values are ignored

If used on KGroupedTable:spa

  • Null keys are ignored
  • Null values are treated as "delete" (=tombstones)

KGroupedStream / KGroupedTable Aggregate()

 

经过groupByKeygroupBy分组后,返回KGroupedStreamKGroupedTable数据类型,它们能够进行聚合的操做。聚合是基于key操做的。这里有个注意点,kafka streams要求同一个链接操做所涉及的topic必需要有相同数量的分区,而且链接所用的key必须就是分区的key,至于为何能够想想分库分表后的join问题。.net

滚动聚合,按分组键进行聚合。翻译

聚合分组流时,必须提供初始值设定项(例如,aggValue = 0)和“加法”聚合器(例如,aggValue + curValue)。debug

聚合分组表时,必须提供“减法”聚合器(例如:aggValue - oldValue)。

KGroupedStream<byte[], String> groupedStream = ;
KGroupedTable<byte[], String> groupedTable = ;

// 聚合分组流 (注意值类型如何从String更改成Long)
KTable<byte[], Long> aggregatedStream = groupedStream.aggregate(
    () -> 0L, // 初始值
    (aggKey, newValue, aggValue) -> aggValue + newValue.length(), 
    Materialized.as("aggregated-stream-store") // 本地状态名称
        .withValueSerde(Serdes.Long());

// 聚合分组表
KTable<byte[], Long> aggregatedTable = groupedTable.aggregate(
    () -> 0L, 
    (aggKey, newValue, aggValue) -> aggValue + newValue.length(), 
    (aggKey, oldValue, aggValue) -> aggValue - oldValue.length(), 
    Materialized.as("aggregated-table-store") .withValueSerde(Serdes.Long())

 

KGroupedStream:

  1. key为null的记录会被忽略。

  2. 第一次收到记录key时,将调用初始化(并在加法器以前调用)。

  3. 只要记录的值为非null时,就会调用加法器。

KGroupedTable:

  1. key为null的记录会被忽略。
  2. 第一次收到记录key时,将调用初始化(并在加法器和减法器以前调用)。
  3. 当一个key的第一个非null的值被接收,只调用加法器。
  4. 当接收到key的后续非空值(例如,UPDATE)时,则(1)使用存储在表中的旧值调用减法器,以及(2)使用输入记录的新值调用加法器。那是刚收到的。未定义减法器和加法器的执行顺序。
  5. 当为一个key(例如,DELETE)接收到逻辑删除记录(即具备空值的记录)时,则仅调用减法器。请注意,只要减法器自己返回空值,就会从生成的KTable中删除相应的键。若是发生这种状况,该key的任何下一个输入记录将再次触发初始化程序。

下面是关于 KGroupedTable 中 aggregate 函数的介绍, 我会晚一些把它详细的翻译成中文,由于这个函数若是用好功能是十分强大,同时也是很难掌握的。

<VR> KTable<K,VR> aggregate(Initializer<VR> initializer,
                            Aggregator<? super K,? super V,VR> adder,
                            Aggregator<? super K,? super V,VR> subtractor,
                            Materialized<K,VR,KeyValueStore<org.apache.kafka.common.utils.Bytes,byte[]>> materialized)
Aggregate the value of records of the original KTable that got mapped to the same key into a new instance of KTable. Records with null key are ignored. Aggregating is a generalization of combining via reduce(...) as it, for example, allows the result to have a different type than the input values. The result is written into a local KeyValueStore (which is basically an ever-updating materialized view) that can be queried using the provided queryableStoreName. Furthermore, updates to the store are sent downstream into a KTable changelog stream.
The specified Initializer is applied once directly before the first input record is processed to provide an initial intermediate aggregation result that is used to process the first record. Each update to the original KTable results in a two step update of the result KTable. The specified adder is applied for each update record and computes a new aggregate using the current aggregate (or for the very first record using the intermediate aggregation result provided via the Initializer) and the record's value by adding the new record to the aggregate. The specified subtractor is applied for each "replaced" record of the original KTable and computes a new aggregate using the current aggregate and the record's value by "removing" the "replaced" record from the aggregate. Thus, aggregate(Initializer, Aggregator, Aggregator, Materialized) can be used to compute aggregate functions like sum. For sum, the initializer, adder, and subtractor would work as follows:


 // in this example, LongSerde.class must be set as value serde in Materialized#withValueSerde
 public class SumInitializer implements Initializer<Long> {
   public Long apply() {
     return 0L;
   }
 }

 public class SumAdder implements Aggregator<String, Integer, Long> {
   public Long apply(String key, Integer newValue, Long aggregate) {
     return aggregate + newValue;
   }
 }

 public class SumSubtractor implements Aggregator<String, Integer, Long> {
   public Long apply(String key, Integer oldValue, Long aggregate) {
     return aggregate - oldValue;
   }
 }
 
Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to the same key. The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of parallel running Kafka Streams instances, and the configuration parameters for cache size, and commit intervall.
To query the local KeyValueStore it must be obtained via KafkaStreams#store(...):


 KafkaStreams streams = ... // counting words
 ReadOnlyKeyValueStore<String, Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
 String key = "some-word";
 Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
 
For non-local keys, a custom RPC mechanism must be implemented using KafkaStreams.allMetadata() to query the value of the key on a parallel running instance of your Kafka Streams application.
For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. Therefore, the store name defined by the Materialized instance must be a valid Kafka topic name and cannot contain characters other than ASCII alphanumerics, '.', '_' and '-'. The changelog topic will be named "${applicationId}-${storeName}-changelog", where "applicationId" is user-specified in StreamsConfig via parameter APPLICATION_ID_CONFIG, "storeName" is the provide store name defined in Materialized, and "-changelog" is a fixed suffix. You can retrieve all generated internal topic names via Topology.describe().

Type Parameters:
VR - the value type of the aggregated KTable
Parameters:
initializer - an Initializer that provides an initial aggregate result value
adder - an Aggregator that adds a new record to the aggregate result
subtractor - an Aggregator that removed an old record from the aggregate result
materialized - the instance of Materialized used to materialize the state store. Cannot be null
Returns:
a KTable that contains "update" records with unmodified keys, and values that represent the latest (rolling) aggregate for each key

 

For KGroupedStream aggregate() you need an initializer, an adder, a Serde and a StateStore name (name of your aggregation)

For KGroupedTable aggregate() you need an initializer, an adder, a substractor, a Serde and a StateStore name (name of your aggregation)

substractor is used for keep current state of data, if key1, value1 is updated to key1, value2 then by count we will got key1, (count of value1 - 1)

Why do we need  substractor in KGroupedTable, but not in KGroupedStream? Because in Stream we only will perform insert operation, which for aggregate() could be handled with add.  In KGroupedTable we perform not only insert, but also update/delete, that's why we need a substractor for it.

 

KGroupedStream / KGroupedTable Reduce()

Reduce is a simplified version of aggregate(), but the result type has to be the same as an input:

(int, int) => int or (String, String) => String (example concat(a,b))

KStream Peek()

Peek allows you to apply a side-effect operation to a KStream and get the same KStream as a result

For example:

  • Printing the stream to the consolo
  • Statistics collection

Warning: it could be executed multiple times (cause of at least once concept) as it is side effect (in case of failures)

Like for debugging on the console:

KStream Transform() / TransformValues()

Trust me, you won't need them !!!

Summary

This diagram explains, how different Kafka object interact and transform:

 

发现一篇不错的文章:

https://my.oschina.net/u/2424727/blog/2989115

相关文章
相关标签/搜索