Storm之trident聚合操做介绍

Trident主要有5类操做:网络

一、做用在本地的操做,不产生网络传输。并发

二、对数据流的重分布,不改变流的内容,可是产生网络传输。ide

三、聚合操做,有可能产生网络传输。oop

四、做用在分组流(grouped streams)上的操做。优化

五、Mergejoinspa

 

这里主要介绍一下34,但愿对你们有所帮助,若有错误请指正!orm

首先说几个名词:对象

PartitionStorm中并发的最小执行单元是task;在tridentpartition至关于task的角色。接口

Grouped streams对数据流作groupBy操做后,将key相同的流组织在一块儿,造成若干组流。事务

Global aggregation没有groupBy的聚合,即全局聚合。

AggregatorTrident中定义的用于实现聚合方法的接口。

下面开始介绍:

做用在tridentStream对象上的与聚合相关的主要方法:

aggregate

partitionAggregate

persistentAggregate

groupBy

partitionBy

partitionPersist

parallelismHint

做为聚合操做的一个参数,实现聚合功能的主要接口有:

Aggregator<T>

CombinerAggregator<T>

ReducerAggregator<T>

----------------------华丽丽的分割线----------------------------------

使用过Hive的人都知道,不含group by的聚合SQL在转化成Hadoop做业后,在编译时就肯定了只能有1reduce,由于全局聚合在汇总阶段只能由1个计算单元完成。一样的道理,当aggregate方法用于无groupByglobal aggregation时,每一个批次(batch)的流也只能在1partition中执行(使用AggregatorReducerAggregator接口,而CombinerAggregator例外,后面会讲到)。当咱们使用aggregate计算global aggregation时,若是经过parallelismHint设置了并发数为ntrident的作法是经过轮循的方式让不一样的批次依次在npartition中执行,实际上仍是在串行执行,意义不大。所以使用aggregateglobal aggregation时,并不能实现并发的功能,只适用于数据量不大的场景,这时候最好把并发设成1,不然对资源是一种浪费。

值得一提的是,若是实现了自定义分组的CustomStreamGrouping接口,后面再跟global aggregation,例如:
trident.newStream(“TRIDENT_SPOUT”, new  MySpout())
       .partition(new  MyCustomStreamGrouping())
       .aggregate(new MyAggregator(), new  Fields(“out1”))
       .parallelismHint(10);

这时候实际上咱们的自定义分组是不起做用的,由于上面已经说明,此时并发并无真正开启,而是采起的轮循策略。只有将aggregate换成partitionAggregate,自定义的分组才会起做用。

使用aggregate作分组聚合是它的强项,此时能够充分发挥并发的特性。可是须要注意,假设并发度设置为10,而咱们groupBykey的不一样值实际上只有2个,那势必有不少partition在空跑,形成资源浪费。

partitionAggregate一般用于global aggregation时的本地化聚合,相似于Hadoop中的map阶段。partitionAggregate是在每个partition内独立调用本身的聚合操做,互不干涉。最后还须要把局部聚合值emit出来,经过网络传输供后面的aggregate作全局聚合。经过这种策略,能够实现global aggregation的并发。partitionAggregate的前面不能跟groupBy方法,由于groupBy方法返回的GroupedStream对象没有partitionAggregate方法。

Aggregator<T>接口是三种实现聚合功能的接口中最通用的一种。Aggregator<T>要实现5个方法:

prepare只在启动拓扑时调用1次。若是设置了并发度,则每个partition调用1次。
cleanup
只在正常关闭拓扑时调用1次。若是设置了并发度,则每个partition调用1次。
init
对于global aggregation,每一个批次调用1次。若是使用的是partitionAggregate,则每个批次的每个partition调用1次。对于Grouped Streams,每一个相同的key组成的数据流调用1次。须要注意的是,若是使用的是事务型spout,同时某个批次处理失败后致使该批次消息重发,则在接下来处理时init有可能会调用屡次。所以init里面的代码逻辑应该要支持同一批的重复调用。
aggregate
1tuple调用1次。
complete
对于global aggregation,每一个批次调用1次。若是使用的是partitionAggregate,则每个批次的每个partition调用1次。对于Grouped Streams,每一个相同的key组成的数据流调用1次。

再说一下CombinerAggregator<T>,它比较有趣,前面提到使用aggregateglobal aggregation没法开启并发。可是当CombinerAggregator<T>aggregate配合使用时,例如:
trident.newStream(“TRIDENT_SPOUT”, new  MySpout())
       .parallelismHint(10)
       .aggregate(new MyCombinerAggregator(), new Fields(“out1”));

Trident会把拓扑自动拆分红2bolt,第一个bolt作局部聚合,相似于Hadoop中的map;第二个bolt经过接收网络传输过来的局部聚合值最后作一个全局聚合,相似于Hadoop中的reduce。在上面的例子中,局部聚合开启了10个并发,这就实现了使用aggregateglobal aggregation时真正开启并发。固然,使用partitionAggregate能够实现一样的功能。相似于:
trident.newStream(“TRIDENT_SPOUT”, new  MySpout())
       .partitionAggregate(new  MyAggregator(), new  Fields(“out1”))
       .parallelismHint(10)
       .aggregate(new Fields(“out1”), new MyAggregator(), new  Fields(“out2”));
有三点须要注意:
1
、自动优化后的第一个bolt是本地化操做,所以它能够和它前面或者后面挨着的全部each合并在同一个bolt里面。
2
parallelismHint(n)要写在aggregate的前面,若是写在aggregate后面,将致使本地化操做的第一个bolt的并发度为1,而全局聚合的第二个bolt的并发度为n,而实际上第二个bolt并不能真正开启并发,只是前面提到的轮循而已。
3
、综合12,把parallelismHint(n)写在aggregate的前面会致使spout同时开启n的并发度,所以要注意本身实现的spout类是否支持并发发送。

CombinerAggregator<T>须要实现3个方法:
init
每条tuple调用1次,对tuple作预处理。
combine
每条tuple调用1次,和以前的聚合值作combine。若是是第一条tuple则和zero返回的值作combine
zero
当没有数据流时的处理逻辑。
整个CombinerAggregator<T>会在每批次结束时将combine的结果作一次emit

persistentAggregate是实现聚合的另一种方式。前面介绍的聚合能够当作是对每一个批次的数据作本批次内的聚合计算,至于批次之间如何merge须要本身处理。而persistentAggregate能够当作是对源源不断发送过来数据流作一个总的聚合,每一个批次的聚合值只是一个中间状态,经过与trident新提出的state概念结合,实现中间状态的持久化,同时支持事务性。persistentAggregate不能使用Aggregator<T>,只能使用CombinerAggregator<T>或者ReducerAggregator<T>

关于state接口,它的使用场景很是多,这里先不作详细介绍。它能够做为Stream.stateQuery的参数按批次对持久化的数据作查询;也能够配合Stream.partitionPersist按批次作持久化操做,相似于IBatchBolt<T>.finishBatch所能实现的功能。

EOF

相关文章
相关标签/搜索