Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature(当前:1.0.0-rc0,参见:https://github.com/apache/kafka/releases),它提供了对存储于Kafka内的数据进行流式处理和分析的功能。其主要特色以下:html
简言之,Kafka Streams解决了流式处理中的以下困难问题node
为何要有Kafka Streamgit
当前已经有很是多的流式处理系统,最知名且应用最多的开源流式处理系统有Spark Streaming和Apache Storm。Apache Storm发展多年,应用普遍,提供记录级别的处理能力,当前也支持SQL on Stream。而Spark Streaming基于Apache Spark,能够很是方便与图计算,SQL处理等集成,功能强大,对于熟悉其它Spark应用开发的用户而言使用门槛低。另外,目前主流的Hadoop发行版,如MapR,Cloudera和Hortonworks,都集成了Apache Storm和Apache Spark,使得部署更容易。既然Apache Spark与Apache Storm拥用如此多的优点,那为什么还须要Kafka Stream呢?主要有以下缘由:github
KTable vs. KStream数据库
KTable和KStream是Kafka Stream中很是重要的两个概念,它们是Kafka实现各类语义的基础。所以这里有必要分析下两者的区别。apache
如下图为例,假设有一个KStream和KTable,基于同一个Topic建立,而且该Topic中包含以下图所示5条数据。此时遍历KStream将获得与Topic内数据彻底同样的全部5条数据,且顺序不变。而此时遍历KTable时,由于这5条记录中有3个不一样的Key,因此将获得3条记录,每一个Key对应最新的值,而且这三条数据之间的顺序与原来在Topic中的顺序保持一致。这一点与Kafka的日志compact相同。网络
此时若是对该KStream和KTable分别基于key作Group,对Value进行Sum,获得的结果将会不一样。对KStream的计算结果是<Jack,4>,<Lily,7>,<Mike,4>。而对Ktable的计算结果是<Mike,4>,<Jack,3>,<Lily,5>多线程
State store : 架构
流式处理中,部分操做是无状态的,例如过滤操做(Kafka Stream DSL中用filer方法实现)。而部分操做是有状态的,须要记录中间状态,如Window操做和聚合计算。框架
State store被用来存储中间状态。它能够是一个持久化的Key-Value存储,也能够是内存中的HashMap,或者是数据库。Kafka提供了基于Topic的状态存储。Topic中存储的数据记录自己是Key-Value形式的,同时Kafka的log compaction机制可对历史数据作compact操做,保留每一个Key对应的最后一个Value,从而在保证Key不丢失的前提下,减小总数据量,从而提升查询效率。
构造KTable时,须要指定其state store name。默认状况下,该名字也即用于存储该KTable的状态的Topic的名字,遍历KTable的过程,实际就是遍历它对应的state store,或者说遍历Topic的全部key,并取每一个Key最新值的过程。为了使得该过程更加高效,默认状况下会对该Topic进行compact操做。
另外,除了KTable,全部状态计算,都须要指定state store name,从而记录中间状态
时间:
在流式数据处理中,时间是数据的一个很是重要的属性。从Kafka 0.10开始,每条记录除了Key和Value外,还增长了timestamp属性。目前Kafka Stream支持三种时间
注:Kafka Stream容许经过实现org.apache.kafka.streams.processor.TimestampExtractor接口自定义记录时间。
窗口:
流式数据是在时间上无界的数据。而聚合操做只能做用在特定的数据集,也即有界的数据集上。所以须要经过某种方式从无界的数据集上按特定的语义选取出有界的数据。窗口是一种很是经常使用的设定计算边界的方式。不一样的流式处理系统支持的窗口相似,但不尽相同。Kafka Stream支持的窗口以下:
Join:
kafka Stream因为包含KStream和Ktable两种数据集,所以提供以下Join计算
对于Join操做,若是要获得正确的计算结果,须要保证参与Join的KTable或KStream中Key相同的数据被分配到同一个Task。具体方法是
聚合与乱序处理:
聚合操做可应用于KStream和KTable。当聚合发生在KStream上时必须指定窗口,从而限定计算的目标数据集。须要说明的是,聚合操做的结果确定是KTable。由于KTable是可更新的,能够在晚到的数据到来时(也即发生数据乱序时)更新结果KTable。
这里举例说明。假设对KStream以5秒为窗口大小,进行Tumbling Time Window上的Count操做。而且KStream前后出现时间为1秒, 3秒, 5秒的数据,此时5秒的窗口已达上限,Kafka Stream关闭该窗口,触发Count操做并将结果3输出到KTable中(假设该结果表示为<1-5,3>)。若1秒后,又收到了时间为2秒的记录,因为1-5秒的窗口已关闭,若直接抛弃该数据,则可认为以前的结果<1-5,3>不许确。而若是直接将完整的结果<1-5,4>输出到KStream中,则KStream中将会包含该窗口的2条记录,<1-5,3>, <1-5,4>,也会存在肮数据。所以Kafka Stream选择将聚合结果存于KTable中,此时新的结果<1-5,4>会替代旧的结果<1-5,3>。用户可获得完整的正确的结果。这种方式保证了数据准确性,同时也提升了容错性。
但须要说明的是,Kafka Stream并不会对全部晚到的数据都从新计算并更新结果集,而是让用户设置一个retention period,将每一个窗口的结果集在内存中保留必定时间,该窗口内的数据晚到时,直接合并计算,并更新结果KTable。超过retention period后,该窗口结果将从内存中删除,而且晚到的数据即便落入窗口,也会被直接丢弃。
容错:
Kafka Stream从以下几个方面进行容错:
Kafka Stream总体架构
kafka stream的架构以下:
前(Kafka 0.11.0.0)Kafka Stream的数据源只能如上图所示是Kafka。可是处理结果并不必定要如上图所示输出到Kafka。上图中的Consumer和Producer并不须要开发者在应用中显示实例化,而是由Kafka Stream根据参数隐式实例化和管理,从而下降了使用门槛。开发者只须要专一于开发核心业务逻辑,也即上图中Task内的部分。
Processor Topology:基于Kafka Stream的流式应用的业务逻辑所有经过一个被称为Processor Topology的地方执行。它与Storm的Topology和Spark的DAG相似,都定义了数据在各个处理单元(在Kafka Stream中被称做Processor)间的流动方式,或者说定义了数据的处理逻辑。
Kafka Stream并行模型:Kafka Stream的并行模型中,最小粒度为Task,而每一个Task包含一个特定子Topology的全部Processor。所以每一个Task所执行的代码彻底同样,惟一的不一样在于所处理的数据集互补。以下图展现了在一个进程(Instance)中以2个Topic(Partition数均为4)为数据源的Kafka Stream应用的并行模型。从图中能够看到,因为Kafka Stream应用的默认线程数为1,因此4个Task所有在一个线程中运行。
为了充分利用多线程的优点,能够设置Kafka Stream的线程数。下图展现了线程数为2时的并行模型。
Kafka Stream可被嵌入任意Java应用(理论上基于JVM的应用均可以)中,下图展现了在同一台机器的不一样进程中同时启动同一Kafka Stream应用时的并行模型。注意,这里要保证两个进程的StreamsConfig.APPLICATION_ID_CONFIG彻底同样。由于Kafka Stream将APPLICATION_ID_CONFI做为隐式启动的Consumer的Group ID。只有保证APPLICATION_ID_CONFI相同,才能保证这两个进程的Consumer属于同一个Group,从而能够经过Consumer Rebalance机制拿到互补的数据集。
既然实现了多进程部署,能够以一样的方式实现多机器部署。该部署方式也要求全部进程的APPLICATION_ID_CONFIG彻底同样。从图上也能够看到,每一个实例中的线程数并不要求同样。可是不管如何部署,Task总数总会保证一致。
应用示例
示例完整代码地址: https://github.com/habren/KafkaExample ,Schemal结构说明:
如今但愿计算每小时购买产地与本身所在地相同的用户总数。
public class OrderTimestampExtractor implements TimestampExtractor { @Override public long extract(ConsumerRecord<Object, Object> record) { if(record instanceof Order) { return ((Order)record).getTS(); } else { return 0; } } }
orderUserStream = orderStream .leftJoin(userTable, // 该lamda表达式定义了如何从orderStream与userTable生成结果集的Value (Order order, User user) -> OrderUser.fromOrderUser(order, user), // 结果集Key序列化方式 Serdes.String(), // 结果集Value序列化方式 SerdesFactory.serdFrom(Order.class)) .filter((String userName, OrderUser orderUser) -> orderUser.userAddress != null)
orderUserStrea .through( // Key的序列化方式 Serdes.String(), // Value的序列化方式 SerdesFactory.serdFrom(OrderUser.class), // 从新按照商品名进行分区,具体取商品名的哈希值,而后对分区数取模 (String key, OrderUser orderUser, int numPartitions) -> (orderUser.getItemName().hashCode() & 0x7FFFFFFF) % numPartitions, "orderuser-repartition-by-item") .leftJoin(itemTable, (OrderUser orderUser, Item item) -> OrderUserItem.fromOrderUser(orderUser, item), Serdes.String(), SerdesFactory.serdFrom(OrderUser.class))
小结:
参考资料: