Trident 的核心数据模型是“流”(Stream),进行数据处理的时候,将数据做为一系列的batch(批)来进行。流被分割成多个partition分布在集群中的不一样节点上来运行,并且对流的操做也是在流的各个partition上并行运行的。html
Trident 中有五类操做:java
本地分区操做是在每一个batch partition上独立运行的操做,其中不涉及网络数据传输。git
Functions函数负责接收一个input fields的集合并选择输出更多的tuple或者不输出tuple。输出tuple的fields会被添加到原始数据流的输入域中。若是一个function不输出tuple,那么原始的输入tuple就会被直接过滤掉。不然,每一个输出 tuple 都会复制一份输入tuple。假设你有下面这样的函数:github
public class MyFunction extends BaseFunction { public void execute(TridentTuple tuple, TridentCollector collector) { for(int i=0; i < tuple.getInteger(0); i++) { collector.emit(new Values(i)); } } } |
假设你有一个名为 “mystream” 的数据流,这个流中包含下面几个 tuple,每一个 tuple 中包含有 "a"、"b"、"c" 三个域:算法
[1, 2, 3] [4, 1, 6] [3, 0, 8] |
若是你运行这段代码:apache
mystream.each(new Fields("b"), new MyFunction(), new Fields("d"))) |
那么最终输出的结果 tuple 就会包含有 "a"、"b"、"c"、"d"4 个域,就像下面这样:windows
[1, 2, 3, 0] [1, 2, 3, 1] [4, 1, 6, 0] |
过滤器负责判断输入的 tuple 是否须要保留。如下面的过滤器为例:网络
public class MyFilter extends BaseFilter { public boolean isKeep(TridentTuple tuple) { return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2; } } |
经过使用这段代码:并发
mystream.each(new Fields("b", "a"), new MyFilter()) |
就能够将下面这样带有 "a"、"b"、"c"三个域的 tupleapp
[1, 2, 3] [2, 1, 1] [2, 3, 4] |
最终转化成这样的结果 tuple:
[2, 1, 1] |
map对stream中的tuple应用map函数,并返回结果流。这能够用于对tuples进行one-one(一对一)的转换(transformation)操做。
举例,若是你想将一个stream中的单词转换成大写,你能够定义一个mapping函数,以下:
public class UpperCase extends MapFunction { @Override public Values execute(TridentTuple input) { return new Values(input.getString(0).toUpperCase()); } } |
mapping函数应用到stream上并生成一个由大写单词组成的stream。
mystream.map(new UpperCase()); |
flatMap与map相似,可是被用来对stream中的values进行one-to-many(一对多)操做,而后会将resulting elements(结果元素)flatten平压至一个新的stream中。
public class Split extends FlatMapFunction { @Override public Iterable<Values> execute(TridentTuple input) { List<Values> valuesList = new ArrayList<>(); for (String word : input.getString(0).split(" ")) { valuesList.add(new Values(word)); } return valuesList; } } |
flatMap函数被应用在一个句子stream中,生成一个单词stream。
mystream.flatMap(new Split()) |
固然这些操做能够被链接在一块儿,能够从一个sentences stream中得到一个大写单词的stream
mystream.flatMap(new Split()).map(new UpperCase()) |
peek能够用于在每一个trident tuples流经stream时执行附加的操做(主要是为了输出一些信息,并不改变tuples或者fields)。这对于调试查看在管道中某个点上的tuples是有用的。
举例,接下来的代码将打印将单词转换成大写单词后groupBy操做的结果。
mystream.flatMap(new Split()).map(new UpperCase()).peek(new Consumer() { @Override public void accept(TridentTuple input) { System.out.println(input.getString(0)); } }).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count")); |
min和minBy操做将返回在trident stream中一个batch of tuples的每一个partition的最小值。
假设,一个trident流中包含fields["device-id", "count"],下面是每一个partition of tuples。
Partition 0: [123, 2] [113, 54] [23, 28] [237, 37] [12, 23] [62, 17] [98, 42] Partition 1: [64, 18] [72, 54] [2, 28] [742, 71] [98, 45] [62, 12] [19, 174] Partition 2: [27, 94] [82, 23] [9, 86] [53, 71] [74, 37] [51, 49] [37, 98] |
minBy操做讲对每一个partition中的fields named count取最小值,而且输出这个最小值的tuple。
mystream.minBy(new Fields("count")) |
上面代码执行后各partition的结果:
Partition 0: [123, 2] Partition 1: [62, 12] Partition 2: [82, 23] |
你能够看在org.apache.storm.trident.Stream类中查看min和minBy操做。
相关例子能够从下面的连接查看 TridentMinMaxOfDevicesTopology 和 TridentMinMaxOfVehiclesTopology
max和maxBy操做将返回在trident stream中一个batch of tuples的每一个partition的最大值。
假设,一个trident流中包含fields["device-id", "count"],下面是每一个partition of tuples。
maxBy操做讲对每一个partition中的fields named count取最大值,而且输出这个最大值的tuple。
mystream.maxBy(new Fields("count")) |
上面代码执行后各partition的结果:
Partition 0: [113, 54] Partition 1: [19, 174] Partition 2: [37, 98] |
你能够看在org.apache.storm.trident.Stream类中查看max和maxBy操做。
相关例子能够从下面的连接查看 TridentMinMaxOfDevicesTopology 和 TridentMinMaxOfVehiclesTopology
相关window分类可参照window页面。
Trident流能够处理在某个相同学口中batch的tuples,并将聚合结果发送给下一个操做。有两种windowing,分别是基于processing时间或者tuples数量:1.Tumbling window翻滚窗口2。Sliding window滑动窗口
元组根据processing时间或者tuples数量分组在某个窗口中。任何元组只属于一个窗口。
Tuples are grouped in windows and window slides for every sliding interval. A tuple can belong to more than one window.
在每一个滑动间隔中,元组在窗口和窗口滑动中分组。一个元组能够属于多个窗口。
通用windowing API:
public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields,Aggregator aggregator, Fields functionFields) |
windowConfig定义窗口的属性:window length和window sliding length。
WindowsStoreFactory用来储存接受到的tuples,而且聚合values。
partitionAggregate
会在一个batch of tuples的每一个partition上执行function。与上面的函数不一样,由partitionAggregate
发送出的tuples会将替换输入tuples。如下面这段代码为例:
mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum")); |
假如输入流中包含有 "a"、"b" 两个域而且有如下几个tuple块:
Partition 0: ["a", 1] ["b", 2] Partition 1: ["a", 3] ["c", 8] Partition 2: ["e", 1] ["d", 9] ["d", 10] |
通过上面的代码以后,输出就会变成带有一个名为 "sum"的域的数据流,其中的tuple就是这样的:
Partition 0: [3] Partition 1: [11] Partition 2: [20] |
Storm 有三个用于定义聚合器的接口:CombinerAggregator
,ReducerAggregator
以及 Aggregator
。
这是 CombinerAggregator
接口,整个CombinerAggregator<T>会在每批次结束时将combine的结果作一次emit:
public interface CombinerAggregator<T> extends Serializable { T init(TridentTuple tuple);//每条tuple调用一次,对tuple作预处理。 T combine(T val1, T val2);//每条tuple调用一次,和以前的聚合值作combine。若是是partition中没有tuple则返回zero值做为combine的结果。 T zero();//当partition中没有数据流时,处理逻辑。 } |
CombinerAggregator
会将带有一个field的一个单独的tuple返回做为输出。CombinerAggregator
会在每一个输入tuple上运行初始化函数init,而后使用组合函数来组合全部输入的值。若是在某个分区中没有 tuple, CombinerAggregator
就会输出zero
方法的结果。例如,下面是 Count
的实现代码:
public class Count implements CombinerAggregator<Long> { public Long init(TridentTuple tuple) { return 1L; } public Long combine(Long val1, Long val2) { return val1 + val2; } public Long zero() { return 0L; } } |
若是你使用aggregate方法来代替partitionAggregate方法,你就会发现CombinerAggregators的好处了。在这种状况下,Trident会在发送tuple以前经过分区聚合操做来优化计算过程。
ReducerAggregator的接口:
public interface ReducerAggregator<T> extends Serializable { T init();//用来初始化reduce函数中的参数值curr。执行一次 T reduce(T curr, TridentTuple tuple);//每条tuple调用1次,与curr进行聚合操做。 } |
整个ReducerAggregator<T>会在每batch结束时将reduce的结果作一次emit。
ReducerAggregator会使用init
方法来产生一个初始化的值,而后使用该值对每一个输入tuple进行遍历,并最终生成并输出一个单独的tuple,这个tuple中就包含有咱们须要的计算结果值。例如,下面是将Count定义为ReducerAggregator的代码:
public class Count implements ReducerAggregator<Long> { public Long init() { return 0L; } public Long reduce(Long curr, TridentTuple tuple) { return curr + 1; } } |
ReducerAggregator 一样能够用于 persistentAggregate,你会在后面看到这一点。
最经常使用的聚合器接口仍是下面的 Aggregator接口:
public interface Aggregator<T> extends Operation { T init(Object batchId, TridentCollector collector); void aggregate(T state, TridentTuple tuple, TridentCollector collector); void complete(T state, TridentCollector collector); } |
其中父接口Operation 还有两个方法
public interface Operation extends Serializable { void prepare(Map conf, TridentOperationContext context); void cleanup(); } |
Aggregator<T>接口,实现了上面五个方法:
须要特别注意的是:当使用没有group by 的Aggregator或者ReducerAggregation计算global aggretation时,每一个batch的数据流只能在1个partition(至关于storm的task)中执行,即便设置了parallelismHint的并发数n>1,实际上也只能轮循的叫不一样批次aggregation执行,也就至关于串行执行,因此反而浪费了资源。
使用aggregation作global aggregation没法启动并发,可是当配合CombinerAggregator<T>时候能够,Trident会把拓扑自动拆分红2个bolt,第一个bolt作局部聚合,相似于Hadoop中的map;第二个bolt经过接收网络传输过来的局部聚合值最后作一个全局聚合。自动优化后的第一个bolt是本地化操做,所以它能够和它前面或者后面挨着的全部each合并在同一个bolt里面。
trident.newStream(“trident_spout”, new MySpout()) .partitionAggregate(new MyAggregator(), new Fields(“testoutput1”)) .parallelismHint(5) .aggregate(new Fields(“out1”), new MyAggregator(), new Fields(“testoutput2”)); |
parallelismHint(n)要写在aggregate的前面,若是写在aggregate后面,将致使本地化操做的第一个bolt的并发度为1,而全局聚合的第二个bolt的并发度为n,而实际上第二个bolt并不能真正开启并发,只是前面提到的轮循而已。
把parallelismHint(n)写在aggregate的前面会致使spout同时开启n的并发度,所以要注意本身实现的spout类是否支持并发发送。
下面是使用Aggregator来进行Count的一个实现:
public class CountAgg extends BaseAggregator<CountState> { static class CountState { long count = 0; } public CountState init(Object batchId, TridentCollector collector) { return new CountState(); } public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) { state.count+=1; } public void complete(CountState state, TridentCollector collector) { collector.emit(new Values(state.count)); } } |
某些时候你须要同时计算multiple aggregators时,使用以下的方式进行链接:
mystream.chainedAgg() .partitionAggregate(new Count(), new Fields("count")) .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum")) .chainEnd() |
这段代码会在每一个分区上分别执行 Count 和 Sum 聚合器,而输出中只会包含一个带有 ["count", "sum"] 域的单独的 tuple。
stateQuery 与 partitionPersist 会分别查询、更新 state 数据源。你能够参考 Trident State 文档 来了解如何使用它们。
projection
方法只会保留操做中指定的域。若是你有一个带有 ["a", "b", "c", "d"] 域的数据流,经过执行这段代码:
mystream.project(new Fields("b", "d")) |
就会使得输出数据流中只包含有 ["b","d"] 域。
重分区操做会执行一个用来改变在不一样的任务间分配 tuple 的方式的函数。在重分区的过程当中分区的数量也可能会发生变化(例如,重分区以后的并行度就有可能会增大)。重分区会产生必定的网络数据传输。下面是重分区操做的几个函数:
org.apache.storm.grouping.CustomStreamGrouping
接口。Trident 使用 aggregate 方法和 persistentAggregate 方法来对数据流进行聚类操做。其中,aggregate 方法会分别对数据流中的每一个 batch 进行处理,而 persistentAggregate 方法则会对数据流中的全部 batch 执行聚类处理,并将结果存入某个 state 中。
在数据流上执行 aggregate 方法会执行一个全局的聚类操做。在你使用 ReducerAggregator
或者 Aggregator
时,数据流首先会被从新分区成一个单独的分区,而后聚类函数就会在该分区上执行操做。而在你使用 CombinerAggregator
时,Trident 首先会计算每一个分区的部分聚类结果,而后将这些结果重分区到一个单独的分区中,最后在网络数据传输完成以后结束这个聚类过程。CombinerAggregator
比其余的聚合器的运行效率更高,在聚类时应该尽量使用CombinerAggregator
。
下面是一个使用 aggregate 来获取一个 batch 的全局计数值的例子:
mystream.aggregate(new Count(), new Fields("count")) |
与 partitionAggregate同样,aggregate的聚合器也能够进行链式处理。然而,若是你在一个处理链中同时使用了CombinerAggregator 和non-CombinerAggregator,Trident 就不能对部分聚类操做进行优化了。
想要了解更多使用 persistentAggregate 的方法,能够参考 Trident State 文档 一文。
经过对指定的域执行 partitionBy 操做,groupBy 操做能够将数据流进行重分区,使得相同的域的 tuple 分组能够汇集在一块儿。例如,下面是一个 groupBy 操做的示例:
若是你在分组数据流上执行聚合操做,聚合器会在每一个分组(而不是整个区块)上运行。persistentAggregate 一样能够在一个分组数据里上运行,这种状况下聚合结果会存储在 MapState 中,其中的 key 就是分组的域名。
和其余操做同样,对分组数据流的聚合操做也能够以链式的方式执行。
Trident API 的最后一部分是联结不一样的数据流的操做。联结数据流最简单的方式就是将全部的数据流融合到一个流中。你可使用 TridentTopology 的 merge 方法实现该操做,好比这样:
topology.merge(stream1, stream2, stream3); |
Trident 会将融合后的新数据流的域命名为为第一个数据流的输出域。
联结数据流的另一种方法是使用 join。像 SQL 那样的标准 join 操做只能用于有限的输入数据集,对于无限的数据集就没有用武之地了。Trident 中的 join 只会应用于每一个从 spout 中输出的小 batch。
下面是两个流的 join 操做的示例,其中一个流含有 [“key”, “val1”, “val2”] 域,另一个流含有 [“x”, “val1”] 域:
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c")); |
上面的例子会使用 "key" 和 "x" 做为 join 的域来联结 stream1 和 stream2。Trident 要求先定义好新流的输出域,由于输入流的域可能会覆盖新流的域名。从 join 中输出的 tuple 中会包含:
在对不一样的 spout 发送出的流进行 join 时,这些 spout 上会按照他们发送 batch 的方式进行同步处理。也就是说,一个处理中的 batch 中含有每一个 spout 发送出的 tuple。
到这里你大概仍然会对如何进行窗口 join 操做感到困惑。窗口操做(包括平滑窗口、滚动窗口等 —— 译者注)主要是指将当前的 tuple 与过去若干小时时间段内的 tuple 联结起来的过程。
你可使用 partitionPersist 和 stateQuery 来实现这个过程。过去一段时间内的 tuple 会以 join 域为关键字被保存到一个 state 源中。而后就可使用 stateQuery 查询 join 域来实现这个“联结”(join)的过程。