流式计算新贵Kafka Stream设计详解--转

原文地址:https://mp.weixin.qq.com/s?__biz=MzA5NzkxMzg1Nw==&mid=2653162822&idx=1&sn=8c46114360b98b621b166d41d8e01d74&chksm=8b493028bc3eb93e8376d85c7d1f9b2a699888b7f0f52e4556bb8543ebebd5e102e91ea23355#rdnode

本文介绍了 Kafka Stream 的背景,如 Kafka Stream 是什么,什么是流式计算,以及为何要有 Kafka Stream。接着介绍了 Kafka Stream 的总体架构、并行模型、状态存储以及主要的两种数据集 KStream 和 KTable。而后分析了 Kafka Stream 如何解决流式系统中的关键问题,如时间定义、窗口操做、Join 操做、聚合操做,以及如何处理乱序和提供容错能力。最后结合示例讲解了如何使用 Kafka Stream。Kafka Stream 背景  Kafka Stream 是什么  数据库

Kafka Stream 是 Apache Kafka 从 0.10 版本引入的一个新 Feature。它提供了对存储于 Kafka 内的数据进行流式处理和分析的功能。apache

Kafka Stream 的特色以下:网络

  • Kafka Stream 提供了一个很是简单而轻量的 Library,它能够很是方便地嵌入任意 Java 应用中,也能够任意方式打包和部署多线程

  • 除了 Kafka 外,无任何外部依赖架构

  • 充分利用 Kafka 分区机制实现水平扩展和顺序性保证框架

  • 经过可容错的 state store 实现高效的状态操做(如 windowed join 和 aggregation)ide

  • 支持正好一次处理语义oop

  • 提供记录级的处理能力,从而实现毫秒级的低延迟网站

  • 支持基于事件时间的窗口操做,而且可处理晚到的数据(late arrival of records)

  • 同时提供底层的处理原语 Processor(相似于 Storm 的 spout 和 bolt),以及高层抽象的 DSL(相似于 Spark 的 map/group/reduce)

什么是流式计算  

通常流式计算会与批量计算相比较。在流式计算模型中,输入是持续的,能够认为在时间上是无界的,也就意味着,永远拿不到全量数据去作计算。同时,计算结果是持续输出的,也即计算结果在时间上也是无界的。流式计算通常对实时性要求较高,同时通常是先定义目标计算,而后数据到来以后将计算逻辑应用于数据。同时为了提升计算效率,每每尽量采用增量计算代替全量计算。

批量处理模型中,通常先有全量数据集,而后定义计算逻辑,并将计算应用于全量数据。特色是全量计算,而且计算结果一次性全量输出。

 

为何要有 Kafka Stream  

当前已经有很是多的流式处理系统,最知名且应用最多的开源流式处理系统有 Spark Streaming 和 Apache Storm。Apache Storm 发展多年,应用普遍,提供记录级别的处理能力,当前也支持 SQL on Stream。而 Spark Streaming 基于 Apache Spark,能够很是方便与图计算,SQL 处理等集成,功能强大,对于熟悉其它 Spark 应用开发的用户而言使用门槛低。另外,目前主流的 Hadoop 发行版,如 MapR,Cloudera 和 Hortonworks,都集成了 Apache Storm 和 Apache Spark,使得部署更容易。

既然 Apache Spark 与 Apache Storm 拥用如此多的优点,那为什么还须要 Kafka Stream 呢?笔者认为主要有以下缘由。

第一,Spark 和 Storm 都是流式处理框架,而 Kafka Stream 提供的是一个基于 Kafka 的流式处理类库。框架要求开发者按照特定的方式去开发逻辑部分,供框架调用。开发者很难了解框架的具体运行方式,从而使得调试成本高,而且使用受限。而 Kafka Stream 做为流式处理类库,直接提供具体的类给开发者调用,整个应用的运行方式主要由开发者控制,方便使用和调试。

第二,虽然 Cloudera 与 Hortonworks 方便了 Storm 和 Spark 的部署,可是这些框架的部署仍然相对复杂。而 Kafka Stream 做为类库,能够很是方便的嵌入应用程序中,它对应用的打包和部署基本没有任何要求。更为重要的是,Kafka Stream 充分利用了 Kafka 的分区机制和 Consumer 的 Rebalance 机制,使得 Kafka Stream 能够很是方便的水平扩展,而且各个实例可使用不一样的部署方式。

具体来讲,每一个运行 Kafka Stream 的应用程序实例都包含了 Kafka Consumer 实例,多个同一应用的实例之间并行处理数据集。而不一样实例之间的部署方式并不要求一致,好比部分实例能够运行在 Web 容器中,部分实例可运行在 Docker 或 Kubernetes 中。

第三,就流式处理系统而言,基本都支持 Kafka 做为数据源。例如 Storm 具备专门的 kafka-spout,而 Spark 也提供专门的 spark-streaming-kafka 模块。事实上,Kafka 基本上是主流的流式处理系统的标准数据源。换言之,大部分流式系统中都已部署了 Kafka,此时使用 Kafka Stream 的成本很是低。

第四,使用 Storm 或 Spark Streaming 时,须要为框架自己的进程预留资源,如 Storm 的 supervisor 和 Spark on YARN 的 node manager。即便对于应用实例而言,框架自己也会占用部分资源,如 Spark Streaming 须要为 shuffle 和 storage 预留内存。

第五,因为 Kafka 自己提供数据持久化,所以 Kafka Stream 提供滚动部署和滚动升级以及从新计算的能力。

第六,因为 Kafka Consumer Rebalance 机制,Kafka Stream 能够在线动态调整并行度。

Kafka Stream 架构  

 

Kafka Stream 总体架构  

Kafka Stream 的总体架构图以下。

 

目前(Kafka 0.11.0.0)Kafka Stream 的数据源只能如上图所示是 Kafka。可是处理结果并不必定要如上图所示输出到 Kafka。实际上 KStream 和 Ktable 的实例化都须要指定 Topic。

KStream<String, String> stream = builder.stream("words-stream");

KTable<String, String> table = builder.table("words-table", "words-store");

另外,上图中的 Consumer 和 Producer 并不须要开发者在应用中显示实例化,而是由 Kafka Stream 根据参数隐式实例化和管理,从而下降了使用门槛。开发者只须要专一于开发核心业务逻辑,也即上图中 Task 内的部分。

Processor Topology  

基于 Kafka Stream 的流式应用的业务逻辑所有经过一个被称为 Processor Topology 的地方执行。它与 Storm 的 Topology 和 Spark 的 DAG 相似,都定义了数据在各个处理单元(在 Kafka Stream 中被称做 Processor)间的流动方式,或者说定义了数据的处理逻辑。

下面是一个 Processor 的示例,它实现了 Word Count 功能,而且每秒输出一次结果。

public class WordCountProcessor implements Processor<String, String> {

  private ProcessorContext context;
  private KeyValueStore<String, Integer> kvStore;

  @SuppressWarnings("unchecked")
  @Override
  public void init(ProcessorContext context) {
    this.context = context;
    this.context.schedule(1000);
    this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");
  }

  @Override
  public void process(String key, String value) {
    Stream.of(value.toLowerCase().split(" ")).forEach((String word) -> {
      Optional<Integer> counts = Optional.ofNullable(kvStore.get(word));
      int count = counts.map(wordcount -> wordcount + 1).orElse(1);
      kvStore.put(word, count);
    });
  }

  @Override
  public void punctuate(long timestamp) {
    KeyValueIterator<String, Integer> iterator = this.kvStore.all();
    iterator.forEachRemaining(entry -> {
      context.forward(entry.key, entry.value);
      this.kvStore.delete(entry.key);
    });
    context.commit();
  }

  @Override
  public void close() {
    this.kvStore.close();
  }

}

从上述代码中可见

  • process 定义了对每条记录的处理逻辑,也印证了 Kafka 可具备记录级的数据处理能力。

  • context.scheduler 定义了 punctuate 被执行的周期,从而提供了实现窗口操做的能力。

  • context.getStateStore 提供的状态存储为有状态计算(如窗口,聚合)提供了可能。

Kafka Stream 并行模型  

Kafka Stream 的并行模型中,最小粒度为 Task,而每一个 Task 包含一个特定子 Topology 的全部 Processor。所以每一个 Task 所执行的代码彻底同样,惟一的不一样在于所处理的数据集互补。这一点跟 Storm 的 Topology 彻底不同。Storm 的 Topology 的每个 Task 只包含一个 Spout 或 Bolt 的实例。所以 Storm 的一个 Topology 内的不一样 Task 之间须要经过网络通讯传递数据,而 Kafka Stream 的 Task 包含了完整的子 Topology,因此 Task 之间不须要传递数据,也就不须要网络通讯。这一点下降了系统复杂度,也提升了处理效率。

若是某个 Stream 的输入 Topic 有多个 (好比 2 个 Topic,1 个 Partition 数为 4,另外一个 Partition 数为 3),则总的 Task 数等于 Partition 数最多的那个 Topic 的 Partition 数(max(4,3)=4)。这是由于 Kafka Stream 使用了 Consumer 的 Rebalance 机制,每一个 Partition 对应一个 Task。

下图展现了在一个进程(Instance)中以 2 个 Topic(Partition 数均为 4)为数据源的 Kafka Stream 应用的并行模型。从图中能够看到,因为 Kafka Stream 应用的默认线程数为 1,因此 4 个 Task 所有在一个线程中运行。

为了充分利用多线程的优点,能够设置 Kafka Stream 的线程数。下图展现了线程数为 2 时的并行模型。

前文有提到,Kafka Stream 可被嵌入任意 Java 应用(理论上基于 JVM 的应用均可以)中,下图展现了在同一台机器的不一样进程中同时启动同一 Kafka Stream 应用时的并行模型。注意,这里要保证两个进程的 StreamsConfig.APPLICATION_ID_CONFIG 彻底同样。由于 Kafka Stream 将 APPLICATION_ID_CONFI 做为隐式启动的 Consumer 的 Group ID。只有保证 APPLICATION_ID_CONFI 相同,才能保证这两个进程的 Consumer 属于同一个 Group,从而能够经过 Consumer Rebalance 机制拿到互补的数据集。

既然实现了多进程部署,能够以一样的方式实现多机器部署。该部署方式也要求全部进程的 APPLICATION_ID_CONFIG 彻底同样。从图上也能够看到,每一个实例中的线程数并不要求同样。可是不管如何部署,Task 总数总会保证一致。

注意:Kafka Stream 的并行模型,很是依赖于《Kafka 设计解析(一)- Kafka 背景及架构介绍》一文中介绍的 Kafka 分区机制和《Kafka 设计解析(四)- Kafka Consumer 设计解析》中介绍的 Consumer 的 Rebalance 机制。强烈建议不太熟悉这两种机制的朋友,先行阅读这两篇文章。

这里对比一下 Kafka Stream 的 Processor Topology 与 Storm 的 Topology。

  • Storm 的 Topology 由 Spout 和 Bolt 组成,Spout 提供数据源,而 Bolt 提供计算和数据导出。Kafka Stream 的 Processor Topology 彻底由 Processor 组成,由于它的数据固定由 Kafka 的 Topic 提供。

  • Storm 的不一样 Bolt 运行在不一样的 Executor 中,极可能位于不一样的机器,须要经过网络通讯传输数据。而 Kafka Stream 的 Processor Topology 的不一样 Processor 彻底运行于同一个 Task 中,也就彻底处于同一个线程,无需网络通讯。

  • Storm 的 Topology 能够同时包含 Shuffle 部分和非 Shuffle 部分,而且每每一个 Topology 就是一个完整的应用。而 Kafka Stream 的一个物理 Topology 只包含非 Shuffle 部分,而 Shuffle 部分须要经过 through 操做显示完成,该操做将一个大的 Topology 分红了 2 个子 Topology。

  • Storm 的 Topology 内,不一样 Bolt/Spout 的并行度能够不同,而 Kafka Stream 的子 Topology 内,全部 Processor 的并行度彻底同样。

  • Storm 的一个 Task 只包含一个 Spout 或者 Bolt 的实例,而 Kafka Stream 的一个 Task 包含了一个子 Topology 的全部 Processor。

KTable vs. KStream  

KTable 和 KStream 是 Kafka Stream 中很是重要的两个概念,它们是 Kafka 实现各类语义的基础。所以这里有必要分析下两者的区别。

KStream 是一个数据流,能够认为全部记录都经过 Insert only 的方式插入进这个数据流里。而 KTable 表明一个完整的数据集,能够理解为数据库中的表。

因为每条记录都是 Key-Value 对,这里能够将 Key 理解为数据库中的 Primary Key,而 Value 能够理解为一行记录。能够认为 KTable 中的数据都是经过 Update only 的方式进入的。也就意味着,若是 KTable 对应的 Topic 中新进入的数据的 Key 已经存在,那么从 KTable 只会取出同一 Key 对应的最后一条数据,至关于新的数据更新了旧的数据。

如下图为例,假设有一个 KStream 和 KTable,基于同一个 Topic 建立,而且该 Topic 中包含以下图所示 5 条数据。此时遍历 KStream 将获得与 Topic 内数据彻底同样的全部 5 条数据,且顺序不变。而此时遍历 KTable 时,由于这 5 条记录中有 3 个不一样的 Key,因此将获得 3 条记录,每一个 Key 对应最新的值,而且这三条数据之间的顺序与原来在 Topic 中的顺序保持一致。这一点与 Kafka 的日志 compact 相同。

此时若是对该 KStream 和 KTable 分别基于 key 作 Group,对 Value 进行 Sum,获得的结果将会不一样。对 KStream 的计算结果是<jack,4><Jack,4>,<Lily,7>,<Mike,4><lily,7><mike,4>。而对 Ktable 的计算结果是<mike,4><Mike,4>,<Jack,3>,<Lily,5><jack,3><lily,5>。

State store  

流式处理中,部分操做是无状态的,例如过滤操做(Kafka Stream DSL 中用 filer 方法实现)。而部分操做是有状态的,须要记录中间状态,如 Window 操做和聚合计算。State store 被用来存储中间状态。它能够是一个持久化的 Key-Value 存储,也能够是内存中的 HashMap,或者是数据库。Kafka 提供了基于 Topic 的状态存储。

Topic 中存储的数据记录自己是 Key-Value 形式的,同时 Kafka 的 log compaction 机制可对历史数据作 compact 操做,保留每一个 Key 对应的最后一个 Value,从而在保证 Key 不丢失的前提下,减小总数据量,从而提升查询效率。

构造 KTable 时,须要指定其 state store name。默认状况下,该名字也即用于存储该 KTable 的状态的 Topic 的名字,遍历 KTable 的过程,实际就是遍历它对应的 state store,或者说遍历 Topic 的全部 key,并取每一个 Key 最新值的过程。为了使得该过程更加高效,默认状况下会对该 Topic 进行 compact 操做。

另外,除了 KTable,全部状态计算,都须要指定 state store name,从而记录中间状态。

Kafka Stream 如何解决流式系统中关键问题  时间  

在流式数据处理中,时间是数据的一个很是重要的属性。从 Kafka 0.10 开始,每条记录除了 Key 和 Value 外,还增长了 timestamp 属性。目前 Kafka Stream 支持三种时间

  • 事件发生时间。事件发生的时间,包含在数据记录中。发生时间由 Producer 在构造 ProducerRecord 时指定。而且须要 Broker 或者 Topic 将 message.timestamp.type 设置为 CreateTime(默认值)才能生效。

  • 消息接收时间,也即消息存入 Broker 的时间。当 Broker 或 Topic 将 message.timestamp.type 设置为 LogAppendTime 时生效。此时 Broker 会在接收到消息后,存入磁盘前,将其 timestamp 属性值设置为当前机器时间。通常消息接收时间比较接近于事件发生时间,部分场景下可代替事件发生时间。

  • 消息处理时间,也即 Kafka Stream 处理消息时的时间。

注:Kafka Stream 容许经过实现 org.apache.kafka.streams.processor.TimestampExtractor 接口自定义记录时间。

窗口  

前文提到,流式数据是在时间上无界的数据。而聚合操做只能做用在特定的数据集,也即有界的数据集上。所以须要经过某种方式从无界的数据集上按特定的语义选取出有界的数据。窗口是一种很是经常使用的设定计算边界的方式。不一样的流式处理系统支持的窗口相似,但不尽相同。

Kafka Stream 支持的窗口以下。

一、Hopping Time Window 该窗口定义以下图所示。它有两个属性,一个是 Window size,一个是 Advance interval。Window size 指定了窗口的大小,也即每次计算的数据集的大小。而 Advance interval 定义输出的时间间隔。一个典型的应用场景是,每隔 5 秒钟输出一次过去 1 个小时内网站的 PV 或者 UV。

二、Tumbling Time Window 该窗口定义以下图所示。能够认为它是 Hopping Time Window 的一种特例,也即 Window size 和 Advance interval 相等。它的特色是各个 Window 之间彻底不相交。

三、Sliding Window 该窗口只用于 2 个 KStream 进行 Join 计算时。该窗口的大小定义了 Join 两侧 KStream 的数据记录被认为在同一个窗口的最大时间差。假设该窗口的大小为 5 秒,则参与 Join 的 2 个 KStream 中,记录时间差小于 5 的记录被认为在同一个窗口中,能够进行 Join 计算。

四、Session Window 该窗口用于对 Key 作 Group 后的聚合操做中。它须要对 Key 作分组,而后对组内的数据根据业务需求定义一个窗口的起始点和结束点。一个典型的案例是,但愿经过 Session Window 计算某个用户访问网站的时间。对于一个特定的用户(用 Key 表示)而言,当发生登陆操做时,该用户(Key)的窗口即开始,当发生退出操做或者超时时,该用户(Key)的窗口即结束。窗口结束时,可计算该用户的访问时间或者点击次数等。

Join  

Kafka Stream 因为包含 KStream 和 Ktable 两种数据集,所以提供以下 Join 计算

  • KTable Join KTable 结果仍为 KTable。任意一边有更新,结果 KTable 都会更新。

  • KStream Join KStream 结果为 KStream。必须带窗口操做,不然会形成 Join 操做一直不结束。

  • KStream Join KTable / GlobakKTable 结果为 KStream。只有当 KStream 中有新数据时,才会触发 Join 计算并输出结果。KStream 无新数据时,KTable 的更新并不会触发 Join 计算,也不会输出数据。而且该更新只对下次 Join 生效。一个典型的使用场景是,KStream 中的订单信息与 KTable 中的用户信息作关联计算。

对于 Join 操做,若是要获得正确的计算结果,须要保证参与 Join 的 KTable 或 KStream 中 Key 相同的数据被分配到同一个 Task。具体方法是

  • 参与 Join 的 KTable 或 KStream 的 Key 类型相同(实际上,业务含意也应该相同)

  • 参与 Join 的 KTable 或 KStream 对应的 Topic 的 Partition 数相同Partitioner 策略的最终结果等效(实现不须要彻底同样,只要效果同样便可),也即 Key 相同的状况下,被分配到 ID 相同的 Partition 内

  • 若是上述条件不知足,可经过调用以下方法使得它知足上述条件。

    KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic)

聚合与乱序处理  

聚合操做可应用于 KStream 和 KTable。当聚合发生在 KStream 上时必须指定窗口,从而限定计算的目标数据集。

须要说明的是,聚合操做的结果确定是 KTable。由于 KTable 是可更新的,能够在晚到的数据到来时(也即发生数据乱序时)更新结果 KTable。

这里举例说明。假设对 KStream 以 5 秒为窗口大小,进行 Tumbling Time Window 上的 Count 操做。而且 KStream 前后出现时间为 1 秒, 3 秒, 5 秒的数据,此时 5 秒的窗口已达上限,Kafka Stream 关闭该窗口,触发 Count 操做并将结果 3 输出到 KTable 中(假设该结果表示为<1-5,3>)。若 1 秒后,又收到了时间为 2 秒的记录,因为 1-5 秒的窗口已关闭,若直接抛弃该数据,则可认为以前的结果<1-5,3>不许确。

而若是直接将完整的结果<1-5,4>输出到 KStream 中,则 KStream 中将会包含该窗口的 2 条记录,<1-5,3>, <1-5,4>,也会存在肮数据。所以 Kafka Stream 选择将聚合结果存于 KTable 中,此时新的结果<1-5,4>会替代旧的结果<1-5,3>。用户可获得完整的正确的结果。

这种方式保证了数据准确性,同时也提升了容错性。

但须要说明的是,Kafka Stream 并不会对全部晚到的数据都从新计算并更新结果集,而是让用户设置一个 retention period,将每一个窗口的结果集在内存中保留必定时间,该窗口内的数据晚到时,直接合并计算,并更新结果 KTable。超过 retention period 后,该窗口结果将从内存中删除,而且晚到的数据即便落入窗口,也会被直接丢弃。

容错  

Kafka Stream 从以下几个方面进行容错

  • 高可用的 Partition 保证无数据丢失。每一个 Task 计算一个 Partition,而 Kafka 数据复制机制保证了 Partition 内数据的高可用性,故无数据丢失风险。同时因为数据是持久化的,即便任务失败,依然能够从新计算。

  • 状态存储实现快速故障恢复和从故障点继续处理。对于 Join 和聚合及窗口等有状态计算,状态存储可保存中间状态。即便发生 Failover 或 Consumer Rebalance,仍然能够经过状态存储恢复中间状态,从而能够继续从 Failover 或 Consumer Rebalance 前的点继续计算。

  • KTable 与 retention period 提供了对乱序数据的处理能力。

Kafka Stream 应用示例  

 

下面结合一个案例来说解如何开发 Kafka Stream 应用。本例完整代码可从做者 Github 获取。

订单 KStream(名为 orderStream),底层 Topic 的 Partition 数为 3,Key 为用户名,Value 包含用户名,商品名,订单时间,数量。用户 KTable(名为 userTable),底层 Topic 的 Partition 数为 3,Key 为用户名,Value 包含性别,地址和年龄。商品 KTable(名为 itemTable),底层 Topic 的 Partition 数为 6,Key 为商品名,价格,种类和产地。如今但愿计算每小时购买产地与本身所在地相同的用户总数。

首先因为但愿使用订单时间,而它包含在 orderStream 的 Value 中,须要经过提供一个实现 TimestampExtractor 接口的类从 orderStream 对应的 Topic 中抽取出订单时间。

public class OrderTimestampExtractor implements TimestampExtractor {

  @Override
  public long extract(ConsumerRecord<Object, Object> record) {
    if(record instanceof Order) {
      return ((Order)record).getTS();
    } else {
      return 0;
    }
  }
}

接着经过将 orderStream 与 userTable 进行 Join,来获取订单用户所在地。因为两者对应的 Topic 的 Partition 数相同,且 Key 都为用户名,再假设 Producer 往这两个 Topic 写数据时所用的 Partitioner 实现相同,则此时上文所述 Join 条件知足,可直接进行 Join。

orderUserStream = orderStream
    .leftJoin(userTable, 
         // 该 lamda 表达式定义了如何从 orderStream 与 userTable 生成结果集的 Value
        (Order order, User user) -> OrderUser.fromOrderUser(order, user), 
         // 结果集 Key 序列化方式
        Serdes.String(),
         // 结果集 Value 序列化方式
         SerdesFactory.serdFrom(Order.class))
    .filter((String userName, OrderUser orderUser) -> orderUser.userAddress != null)

从上述代码中,能够看到,Join 时须要指定如何从参与 Join 双方的记录生成结果记录的 Value。Key 不须要指定,由于结果记录的 Key 与 Join Key 相同,故无须指定。Join 结果存于名为 orderUserStream 的 KStream 中。

接下来须要将 orderUserStream 与 itemTable 进行 Join,从而获取商品产地。此时 orderUserStream 的 Key 仍为用户名,而 itemTable 对应的 Topic 的 Key 为产品名,而且两者的 Partition 数不同,所以没法直接 Join。此时须要经过 through 方法,对其中一方或双方进行从新分区,使得两者知足 Join 条件。这一过程至关于 Spark 的 Shuffle 过程和 Storm 的 FieldGrouping。

orderUserStrea
    .through(
        // Key 的序列化方式
        Serdes.String(),
        // Value 的序列化方式 
        SerdesFactory.serdFrom(OrderUser.class), 
        // 从新按照商品名进行分区,具体取商品名的哈希值,而后对分区数取模
        (String key, OrderUser orderUser, int numPartitions) -> (orderUser.getItemName().hashCode() & 0x7FFFFFFF) % numPartitions, 
        "orderuser-repartition-by-item")
    .leftJoin(itemTable, (OrderUser orderUser, Item item) -> OrderUserItem.fromOrderUser(orderUser, item), Serdes.String(), SerdesFactory.serdFrom(OrderUser.class))

从上述代码可见,through 时须要指定 Key 的序列化器,Value 的序列化器,以及分区方式和结果集所在的 Topic。这里要注意,该 Topic(orderuser-repartition-by-item)的 Partition 数必须与 itemTable 对应 Topic 的 Partition 数相同,而且 through 使用的分区方法必须与 iteamTable 对应 Topic 的分区方式同样。通过这种 through 操做,orderUserStream 与 itemTable 知足了 Join 条件,可直接进行 Join。

总结  

 

  • Kafka Stream 的并行模型彻底基于 Kafka 的分区机制和 Rebalance 机制,实现了在线动态调整并行度

  • 同一 Task 包含了一个子 Topology 的全部 Processor,使得全部处理逻辑都在同一线程内完成,避免了没必要的网络通讯开销,从而提升了效率。

  • through 方法提供了相似 Spark 的 Shuffle 机制,为使用不一样分区策略的数据提供了 Join 的可能

  • log compact 提升了基于 Kafka 的 state store 的加载效率

  • state store 为状态计算提供了可能

  • 基于 offset 的计算进度管理以及基于 state store 的中间状态管理为发生 Consumer rebalance 或 Failover 时从断点处继续处理提供了可能,并为系统容错性提供了保障

  • KTable 的引入,使得聚合计算拥用了处理乱序问题的能力

相关文章
相关标签/搜索