Storm是一个实时流计算框架,Trident是对storm的一个更高层次的抽象,Trident最大的特色以batch的形式处理stream。java
一些最基本的操做函数有Filter、Function,Filter能够过滤掉tuple,Function能够修改tuple内容,输出0或多个tuple,并能把新增的字段追加到tuple后面。算法
聚合有partitionAggregate和Aggregator接口。partitionAggregate对当前partition中的tuple进行聚合,它不是重定向操做。Aggregator有三个接口:CombinerAggregator, ReducerAggregator,Aggregator,它们属于重定向操做,它们会把stream重定向到一个partition中进行聚合操做。sql
重定向操做会改变数据流向,但不会改变数据内容,重定向操会产生网络传输,可能影响一部分效率。而Filter、Function、partitionAggregate则属于本地操做,不会产生网络传输。数据库
GroupBy会根据指定字段,把整个stream切分红一个个grouped stream,若是在grouped stream上作聚合操做,那么聚合就会发生在这些grouped stream上而不是整个batch。若是groupBy后面跟的是aggregator,则是聚合操做,若是跟的是partitionAggregate,则不是聚合操做。api
Trident主要有5类操做:网络
一、做用在本地的操做,不产生网络传输。并发
二、对数据流的重分布,不改变流的内容,可是产生网络传输。框架
三、聚合操做,有可能产生网络传输。ide
四、做用在分组流(grouped streams)上的操做。函数
五、Merge和join
概念
partition中文意思是分区,有人将partition理解为Storm里面的task,即并发的基本执行单位。我理解应该是像数据库里面的分区,是将一个batch的数据分区,分红多个partition,或者能够理解为多个子batch,而后多个partition能够并发处理。这里关键的区别是:partition是数据,不是执行的代码。你把数据(tuple)分区之后,若是你没有多个task(并发度)来处理这些分区后的数据,那分区也是没有做用的。因此这里的关系是这样的:先有batch,由于Trident内部是基于batch来实现的;而后有partition;分区后再分配并发度,而后才能进行并发处理。并发度的分配是利用parallelismHint来实现的。
操做
既然有partition的概念,那么也就有partition的操做。Trident提供的分区操做,相似于Storm里面讲的grouping。分区操做有:
重分区操做经过运行一个函数改变元组在任务之间的分布,也能够调整分区的数量(好比重分区以后将并行度调大),重分区须要网络传输的参与。重分区函数包含如下这几个:
注意,除了这里明确提出来的分区操做,Trident里面还有aggregate()函数隐含有分区的操做,它用的是global()操做,这个在后面接收聚合操做的时候还会再介绍。
each() 方法
做用:操做batch中的每个tuple内容,通常与Filter或者Function函数配合使用。
下面经过一个例子来介绍each()方法,假设咱们有一个FakeTweetsBatchSpout,它会模拟一个Stream,随机产生一个个消息。咱们能够经过设置这个Spout类的构造参数来改变这个Spout的batch Size的大小。
1.Filter类:过滤tuple
一个经过actor字段过滤消息的Filter:
public static class PerActorTweetsFilter extends BaseFilter { String actor; public PerActorTweetsFilter(String actor) { this.actor = actor; } @Override public boolean isKeep(TridentTuple tuple) { return tuple.getString(0).equals(actor); } }
Topology:
topology.newStream("spout", spout) .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")) .each(new Fields("actor", "text"), new Utils.PrintFilter());
从上面例子看到,each()方法有一些构造参数
2.Function类:加工处理tuple内容
一个能把tuple中text内容变成大写的Function:
public static class UppercaseFunction extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { collector.emit(new Values(tuple.getString(0).toUpperCase())); } }
Topology:
topology.newStream("spout", spout) .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")) .each(new Fields("text", "actor"), new UppercaseFunction(), new Fields("uppercased_text")) .each(new Fields("actor", "text", "uppercased_text"), new Utils.PrintFilter());
首先,UppercaseFunction函数的输入是Fields("text", "actor"),其做用是把其中的"text"字段内容都变成大写。
其次,它比Filter多出一个输出字段,做用是每一个tuple在通过这个Function函数处理后,输出字段都会被追加到tuple后面,在本例中,执行完Function以后的tuple内容多了一个"uppercased_text",而且这个字段排在最后面。
3. Field Selector与project
咱们须要注意的是,上面每一个each()方法的第一个Field字段仅仅是隐藏掉没有指定的字段内容,实际上被隐藏的字段依然还在tuple中,若是想要完全丢掉它们,咱们就须要用到project()方法。
投影操做做用是仅保留Stream指定字段的数据,好比有一个Stream包含以下字段: [“a”, “b”, “c”, “d”],运行以下代码:
mystream.project(new Fields("b", "d"))
则输出的流仅包含 [“b”, “d”]字段。
aggregation的介绍
首先聚合操做分两种:partitionAggregate(),以及aggregate()。
1.partitionAggregate
partitionAggregate()的操做是在partition上,一个batch的tuple被分红多个partition后,每一个partition都会单独运行partitionAggregate中指定的聚合操做。分区聚合在一批tuple的每个分区上运行一个函数。与函数不一样的是,分区聚合的输出元组会覆盖掉输入元组。请看以下示例:
mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
假设你有一个包含a,b两个字段的输入流,元组的分区状况以下:
Partition 0: ["a", 1] ["b", 2] Partition 1: ["a", 3] ["c", 8] Partition 2: ["e", 1] ["d", 9] ["d", 10]
运行上面的那一行代码将会输出以下的元组,这些元组只包含一个sum字段:
Partition 0: [3] Partition 1: [11] Partition 2: [20]
2.aggregate
aggregate()隐含了一个global分区操做,也就是它作的是全局聚合操做。它针对的是整个batch的聚合计算。
这两种聚合操做,均可以传入不一样的aggregator实现具体的聚合任务。Trident中有三种aggregator接口,分别为:ReducerAggregator,CombinerAggregator,Aggregator。
下面是CombinerAggregator接口的定义:
public interface CombinerAggregator<T> extends Serializable { T init(TridentTuple tuple); T combine(T val1, T val2); T zero(); }
CombinerAggregator返回只有一个字段的一个元组。CombinerAggregator在每一个输入元组上运行init函数,而后经过combine函数聚合结果值直到只剩下一个元组。若是分区中没有任何元组,CombinerAggregator将返回zero函数中定义的元组。好比,下面是Count聚合器的实现:
public class Count implements CombinerAggregator<Long> { public Long init(TridentTuple tuple) { return 1L; } public Long combine(Long val1, Long val2) { return val1 + val2; } public Long zero() { return 0L; } }
ReducerAggregator接口的定义以下:
public interface ReducerAggregator<T> extends Serializable { T init(); T reduce(T curr, TridentTuple tuple); }
ReducerAggregator经过init函数获得一个初始的值,而后对每一个输入元组调用reduce方法计算值,产生一个元组做为输出。好比Count的ReducerAggregator实现以下:
public class Count implements ReducerAggregator<Long> { public Long init() { return 0L; } public Long reduce(Long curr, TridentTuple tuple) { return curr + 1; } }
最经常使用的聚合器的接口是Aggregator,它的定义以下:
public interface Aggregator<T> extends Operation { T init(Object batchId, TridentCollector collector); void aggregate(T state, TridentTuple tuple, TridentCollector collector); void complete(T state, TridentCollector collector); }
Aggregator可以发射任意数量,任意字段的元组。而且能够在执行期间的任什么时候候发射元组,它的执行流程以下:
下面是使用Aggregator接口实现的Count聚合器:
public class CountAgg extends BaseAggregator<CountState> { static class CountState { long count = 0; } public CountState init(Object batchId, TridentCollector collector) { return new CountState(); } public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) { state.count+=1; } public void complete(CountState state, TridentCollector collector) { collector.emit(new Values(state.count)); } }
有些时候,咱们须要通知执行不少个聚合器,则可使用以下的链式调用执行:
mystream.chainedAgg() .partitionAggregate(new Count(), new Fields("count")) .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum")) .chainEnd()
上面的代码将会在每个分区执行Count和Sum聚合器,输出结果是包含count和sum两个字段的元组。
最重要的区别是CombinerAggregator,它是先在partition上作partial aggregate,而后再将这些部分聚合结果经过global分区到一个总的分区,在这个总的分区上对结果进行汇总。
groupBy()分组操做
首先它包含两个操做,一个是分区操做,一个是分组操做。
若是后面是partitionAggregate()的话,就只有分组操做:在每一个partition上分组,分完组后,在每一个分组上进行聚合;
若是后面是aggregate()的话,先根据partitionBy分区,在每一个partition上分组,,分完组后,在每一个分组上进行聚合。
parallelismHint并发度的介绍
它设置它前面全部操做的并发度,直到遇到某个repartition操做为止。
topology.newStream("spout", spout) .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")) .parallelismHint(5) .each(new Fields("actor", "text"), new Utils.PrintFilter());
意味着:parallelismHit以前的spout,each都是5个相同的操做一块儿并发,对,一共有5个spout同时发射数据,其实parallelismHint后面的each操做,也是5个并发。分区操做是做为Bolt划分的分界点的。
若是想单独设置Spout怎么办?要在Spout以后,Bolt以前增长一个ParallelismHint,而且还要增长一个分区操做:
topology.newStream("spout", spout) .parallelismHint(2) .shuffle() .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")) .parallelismHint(5) .each(new Fields("actor", "text"), new Utils.PrintFilter());
不少人只是设置了Spout的并发度,而没有调用分区操做,这样是达不到效果的,由于Trident是不会自动进行分区操做的。像我以前介绍的,先分区,再设置并发度。若是Spout不设置并发度,只设置shuffle,默认是1个并发度,这样后面设置5个并发度不会影响到Spout,由于并发度的影响到shuffle分区操做就中止了。
例子
groupBy+aggregate+parallelismHint
package com.demo; import java.util.HashMap; import java.util.Map; import backtype.storm.tuple.Values; import storm.trident.operation.BaseAggregator; import storm.trident.operation.TridentCollector; import storm.trident.operation.TridentOperationContext; import storm.trident.tuple.TridentTuple; public class MyAgg extends BaseAggregator<Map<String, Integer>> { /** * */ private static final long serialVersionUID = 1L; /** * 属于哪一个分区 */ private int partitionId; /** * 分区数量 */ private int numPartitions; private String batchId; @SuppressWarnings("rawtypes") @Override public void prepare(Map conf, TridentOperationContext context) { partitionId = context.getPartitionIndex(); numPartitions = context.numPartitions(); } public void aggregate(Map<String, Integer> val, TridentTuple tuple, TridentCollector collector) { String word = tuple.getString(0); Integer value = val.get(word); if (value == null) { value = 0; } value++; // 把数据保存到一个map对象中 val.put(word, value); System.err.println("I am partition [" + partitionId + "] and I have kept a tweet by: " + numPartitions + " " + word + " " +batchId); } public void complete(Map<String, Integer> val, TridentCollector collector) { collector.emit(new Values(val)); } public Map<String, Integer> init(Object arg0, TridentCollector arg1) { this.batchId = arg0.toString(); return new HashMap<String, Integer>(); } }
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2, new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f")); spout.setCycle(false); TridentTopology tridentTopology = new TridentTopology(); tridentTopology .newStream("spout", spout) .shuffle() .groupBy(new Fields("sentence")) .aggregate(new Fields("sentence"), new MyAgg(), new Fields("Map")) .parallelismHint(2)
I am partition [0] and I have kept a tweet by: 2 a 1:0 I am partition [0] and I have kept a tweet by: 2 a 1:0 I am partition [0] and I have kept a tweet by: 2 a 2:0 I am partition [1] and I have kept a tweet by: 2 d 2:0 I am partition [0] and I have kept a tweet by: 2 e 3:0 I am partition [1] and I have kept a tweet by: 2 f 3:0
groupBy+partitionAggregate+parallelismHint
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2, new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f")); spout.setCycle(false); TridentTopology tridentTopology = new TridentTopology(); tridentTopology .newStream("spout", spout) .shuffle() .groupBy(new Fields("sentence")) .partitionAggregate(new Fields("sentence"), new MyAgg(), new Fields("Map"))) .toStream() .parallelismHint(2)
I am partition [0] and I have kept a tweet by: 2 a 1:0 I am partition [1] and I have kept a tweet by: 2 a 1:0 I am partition [0] and I have kept a tweet by: 2 a 2:0 I am partition [1] and I have kept a tweet by: 2 d 2:0 I am partition [0] and I have kept a tweet by: 2 e 3:0 I am partition [1] and I have kept a tweet by: 2 f 3:0
因为shuffle已经把tuple平均分配给5个partition了,用groupBy+partitionAggregate来聚合又没有partitionBy分区的做用,因此,直接在5个分区上进行聚合,结果就是每一个分区各有一个tuple。
而用groupBy+aggregate,虽然也是shuffle,可是因为具备partitiononBy分区的做用,值相同的tuple都分配到同一个分区,结果就是每一个分区根据不一样的值来作汇聚。
aggregate+parallelismHint(没有groupBy)
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2, new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f")); spout.setCycle(false); TridentTopology tridentTopology = new TridentTopology(); tridentTopology .newStream("spout", spout) .shuffle() .aggregate(new Fields("sentence"), new MyAgg(), new Fields("Map")) .parallelismHint(2)
I am partition [1] and I have kept a tweet by: 2 a 1:0 I am partition [1] and I have kept a tweet by: 2 a 1:0 I am partition [0] and I have kept a tweet by: 2 a 2:0 I am partition [0] and I have kept a tweet by: 2 d 2:0 I am partition [1] and I have kept a tweet by: 2 e 3:0 I am partition [1] and I have kept a tweet by: 2 f 3:0
partitionAggregate+parallelismHint(没有groupBy操做)
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2, new Values("a"), new Values("a"), new Values("a"),new Values("d"), new Values("e"), new Values("f")); spout.setCycle(false); TridentTopology tridentTopology = new TridentTopology(); tridentTopology .newStream("spout", spout) .shuffle() .partitionAggregate(new Fields("sentence"), new MyAgg(), new Fields("Map")) .toStream() .parallelismHint(2)
I am partition [1] and I have kept a tweet by: 2 a 1:0 I am partition [0] and I have kept a tweet by: 2 a 1:0 I am partition [1] and I have kept a tweet by: 2 a 2:0 I am partition [0] and I have kept a tweet by: 2 d 2:0 I am partition [0] and I have kept a tweet by: 2 e 3:0 I am partition [1] and I have kept a tweet by: 2 f 3:0
咱们能够发现,partitionAggregate加上groupBy,或者不加上groupBy,对结果都同样:groupBy对于partitionAggregate没有影响。可是对于aggregate来讲,加上groupBy,就不是作全局聚合了,而是对分组作聚合;不加上groupBy,就是作全局聚合。
若是spout设置并行度,可是没有加shuffle,不会起做用,分区默认为1,;若是不设置并行度而且没有加shuffle,分区默认为1。
Merge和Joins
api的最后一部分即是如何把各类流汇聚到一块儿。最简单的方式就是把这些流汇聚成一个流。咱们能够这么作:
topology.merge(stream1, stream2, stream3);
Trident指定新的合并以后的流中的字段为stream1中的字段。
另外一种合并流的方式就是join。一个标准的join就像是一个sql,必须有标准的输入,所以,join只针对符合条件的Stream。join应用在来自Spout的每个小Batch中。
下面的例子中,stream1流包含key,val1,val2三个字段,stream2流包含x,val1两个字段:
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));
stream1流的key字段与stream2流的x字段组join操做,另外,Trident要求全部新流的输出字段被重命名,由于输入流可能包含相同的字段名称。链接流发射的元组将会包含:
当join的是来源于不一样Spout的stream时,这些Spout在发射数据时须要同步,一个Batch所包含的tuple会来自各个Spout。