Trident API(翻译)

Trident API Overview

 

Trident 的核心数据模型是“流”(Stream),进行数据处理的时候,将数据做为一系列的batch(批)来进行。流被分割成多个partition分布在集群中的不一样节点上来运行,并且对流的操做也是在流的各个partition上并行运行的。html

Trident 中有五类操做:java

  • 针对每一个小分区(partition)的本地操做,这类操做不会产生网络数据传输(each、map、faltmap、partitionAggregate等)
  • 针对一个数据流的从新分区操做,这类操做不会改变数据流中的内容,可是会产生必定的网络传输(shuffle、partition等)
  • 经过网络数据传输进行的聚合操做(Aggregate)
  • 针对数据流的分组操做(groupBy)
  • 融合与联结操做(merge、join)

Partition-local operations(本地分区操做)

本地分区操做是在每一个batch partition上独立运行的操做,其中不涉及网络数据传输。git

Functions

Functions函数负责接收一个input fields的集合并选择输出更多的tuple或者不输出tuple。输出tuple的fields会被添加到原始数据流的输入域中。若是一个function不输出tuple,那么原始的输入tuple就会被直接过滤掉。不然,每一个输出 tuple 都会复制一份输入tuple。假设你有下面这样的函数:github

 

public class MyFunction extends BaseFunction {
    public void execute(TridentTuple tuple, TridentCollector collector) {
        for(int i=0; i < tuple.getInteger(0); i++) {
            collector.emit(new Values(i));
        }
    }
}

假设你有一个名为 “mystream” 的数据流,这个流中包含下面几个 tuple,每一个 tuple 中包含有 "a"、"b"、"c" 三个域:算法

[1, 2, 3] 
[4, 1, 6] 
[3, 0, 8]

若是你运行这段代码:apache

mystream.each(new Fields("b"), new MyFunction(), new Fields("d")))

那么最终输出的结果 tuple 就会包含有 "a"、"b"、"c"、"d"4 个域,就像下面这样:windows

[1, 2, 3, 0]
[1, 2, 3, 1]
[4, 1, 6, 0]

Filters

过滤器负责判断输入的 tuple 是否须要保留。如下面的过滤器为例:网络

public class MyFilter extends BaseFilter {
    public boolean isKeep(TridentTuple tuple) {
        return tuple.getInteger(0) == 1 && tuple.getInteger(1) == 2;
    }
}

经过使用这段代码:并发

mystream.each(new Fields("b", "a"), new MyFilter())

就能够将下面这样带有 "a"、"b"、"c"三个域的 tupleapp

[1, 2, 3]
[2, 1, 1]
[2, 3, 4]

最终转化成这样的结果 tuple:

[2, 1, 1]

map and flatMap

map对stream中的tuple应用map函数,并返回结果流。这能够用于对tuples进行one-one(一对一)的转换(transformation)操做。

举例,若是你想将一个stream中的单词转换成大写,你能够定义一个mapping函数,以下:

 

public class UpperCase extends MapFunction {
 @Override
 public Values execute(TridentTuple input) {
   return new Values(input.getString(0).toUpperCase());
 }
}

 

mapping函数应用到stream上并生成一个由大写单词组成的stream。

mystream.map(new UpperCase());


flatMap与map相似,可是被用来对stream中的values进行one-to-many(一对多)操做,而后会将resulting elements(结果元素)flatten平压至一个新的stream中。

public class Split extends FlatMapFunction {
  @Override
  public Iterable<Values> execute(TridentTuple input) {
    List<Values> valuesList = new ArrayList<>();
    for (String word : input.getString(0).split(" ")) {
      valuesList.add(new Values(word));
    }
    return valuesList;
  }
}

flatMap函数被应用在一个句子stream中,生成一个单词stream。

mystream.flatMap(new Split())

固然这些操做能够被链接在一块儿,能够从一个sentences stream中得到一个大写单词的stream

mystream.flatMap(new Split()).map(new UpperCase())

peek

peek能够用于在每一个trident tuples流经stream时执行附加的操做(主要是为了输出一些信息,并不改变tuples或者fields)。这对于调试查看在管道中某个点上的tuples是有用的。

举例,接下来的代码将打印将单词转换成大写单词后groupBy操做的结果。

mystream.flatMap(new Split()).map(new UpperCase()).peek(new Consumer() {
	@Override
	public void accept(TridentTuple input) {
		System.out.println(input.getString(0));
	}
}).groupBy(new Fields("word")).persistentAggregate(new MemoryMapState.Factory(), new Count(),
		new Fields("count"));

min and minBy

min和minBy操做将返回在trident stream中一个batch of tuples的每一个partition的最小值。

假设,一个trident流中包含fields["device-id", "count"],下面是每一个partition of tuples。

Partition 0:
[123, 2]
[113, 54]
[23,  28]
[237, 37]
[12,  23]
[62,  17]
[98,  42]

Partition 1:
[64,  18]
[72,  54]
[2,   28]
[742, 71]
[98,  45]
[62,  12]
[19,  174]


Partition 2:
[27,  94]
[82,  23]
[9,   86]
[53,  71]
[74,  37]
[51,  49]
[37,  98]

 

minBy操做讲对每一个partition中的fields named count取最小值,而且输出这个最小值的tuple。

mystream.minBy(new Fields("count"))

上面代码执行后各partition的结果:

Partition 0:
[123, 2]


Partition 1:
[62,  12]


Partition 2:
[82,  23]

你能够看在org.apache.storm.trident.Stream类中查看min和minBy操做。

相关例子能够从下面的连接查看 TridentMinMaxOfDevicesTopology 和 TridentMinMaxOfVehiclesTopology

max and maxBy

max和maxBy操做将返回在trident stream中一个batch of tuples的每一个partition的最大值。

假设,一个trident流中包含fields["device-id", "count"],下面是每一个partition of tuples。

maxBy操做讲对每一个partition中的fields named count取最大值,而且输出这个最大值的tuple。

mystream.maxBy(new Fields("count"))

上面代码执行后各partition的结果:

Partition 0:
[113, 54]


Partition 1:
[19,  174]


Partition 2:
[37,  98]

你能够看在org.apache.storm.trident.Stream类中查看max和maxBy操做。

相关例子能够从下面的连接查看  TridentMinMaxOfDevicesTopology 和 TridentMinMaxOfVehiclesTopology

Windowing

相关window分类可参照window页面。

Trident流能够处理在某个相同学口中batch的tuples,并将聚合结果发送给下一个操做。有两种windowing,分别是基于processing时间或者tuples数量:1.Tumbling window翻滚窗口2。Sliding window滑动窗口

Tumbling window

元组根据processing时间或者tuples数量分组在某个窗口中。任何元组只属于一个窗口。

Sliding window

Tuples are grouped in windows and window slides for every sliding interval. A tuple can belong to more than one window.

在每一个滑动间隔中,元组在窗口和窗口滑动中分组。一个元组能够属于多个窗口。

Common windowing API

通用windowing API:

 

public Stream window(WindowConfig windowConfig, WindowsStoreFactory windowStoreFactory, Fields inputFields,Aggregator aggregator, Fields functionFields)

windowConfig定义窗口的属性:window length和window sliding length。

WindowsStoreFactory用来储存接受到的tuples,而且聚合values。

partitionAggregate

partitionAggregate会在一个batch of tuples的每一个partition上执行function。与上面的函数不一样,由partitionAggregate发送出的tuples会将替换输入tuples。如下面这段代码为例:

mystream.partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"));


假如输入流中包含有 "a"、"b" 两个域而且有如下几个tuple块:

Partition 0:
["a", 1]
["b", 2]

Partition 1:
["a", 3]
["c", 8]

Partition 2:
["e", 1]
["d", 9]
["d", 10]


通过上面的代码以后,输出就会变成带有一个名为 "sum"的域的数据流,其中的tuple就是这样的:

Partition 0:
[3]

Partition 1:
[11]

Partition 2:
[20]


Storm 有三个用于定义聚合器的接口:CombinerAggregatorReducerAggregator 以及 Aggregator

这是 CombinerAggregator 接口,整个CombinerAggregator<T>会在每批次结束时将combine的结果作一次emit:

public interface CombinerAggregator<T> extends Serializable {
    T init(TridentTuple tuple);//每条tuple调用一次,对tuple作预处理。
    T combine(T val1, T val2);//每条tuple调用一次,和以前的聚合值作combine。若是是partition中没有tuple则返回zero值做为combine的结果。
    T zero();//当partition中没有数据流时,处理逻辑。
}

 

CombinerAggregator 会将带有一个field的一个单独的tuple返回做为输出。CombinerAggregator会在每一个输入tuple上运行初始化函数init,而后使用组合函数来组合全部输入的值。若是在某个分区中没有 tuple, CombinerAggregator 就会输出zero 方法的结果。例如,下面是 Count 的实现代码:

public class Count implements CombinerAggregator<Long> {
    public Long init(TridentTuple tuple) {
        return 1L;
    }

    public Long combine(Long val1, Long val2) {
        return val1 + val2;
    }

    public Long zero() {
        return 0L;
    }
}

 

若是你使用aggregate方法来代替partitionAggregate方法,你就会发现CombinerAggregators的好处了。在这种状况下,Trident会在发送tuple以前经过分区聚合操做来优化计算过程。

ReducerAggregator的接口:

public interface ReducerAggregator<T> extends Serializable {
    T init();//用来初始化reduce函数中的参数值curr。执行一次
    T reduce(T curr, TridentTuple tuple);//每条tuple调用1次,与curr进行聚合操做。
}

整个ReducerAggregator<T>会在每batch结束时将reduce的结果作一次emit。

ReducerAggregator会使用init方法来产生一个初始化的值,而后使用该值对每一个输入tuple进行遍历,并最终生成并输出一个单独的tuple,这个tuple中就包含有咱们须要的计算结果值。例如,下面是将Count定义为ReducerAggregator的代码:

public class Count implements ReducerAggregator<Long> {
    public Long init() {
        return 0L;
    }

    public Long reduce(Long curr, TridentTuple tuple) {
        return curr + 1;
    }
}


ReducerAggregator 一样能够用于 persistentAggregate,你会在后面看到这一点。

最经常使用的聚合器接口仍是下面的 Aggregator接口:

public interface Aggregator<T> extends Operation {
    T init(Object batchId, TridentCollector collector);
    void aggregate(T state, TridentTuple tuple, TridentCollector collector);
    void complete(T state, TridentCollector collector);
}


其中父接口Operation 还有两个方法

public interface Operation extends Serializable {
    void prepare(Map conf, TridentOperationContext context);
    void cleanup();
}


Aggregator<T>接口,实现了上面五个方法:

  •  prepare:只在启动topolopy时调用1次,若是设置了并发度,则在每个partition中调用一次;
  •  cleanup:只在正常关闭topolopy时调用1次,若是设置了并发度,则在每个partition中调用1次;
  •  init:对于global aggregation来讲,每一个批次调用1次。若是使用的时partitionAggregate则每一个批次的每个partition调用一次。对于Group Streams来讲,每一个相同的key组成的数据流调用一次。须要注意的是,若是使用的是事务型的spout,同时某个批次处理失败致使该批次消息从新发送,则在接下来处理时,initu有可能调用屡次,因此init里面代码逻辑要支持同一批的重复调用。
  •  aggregate:每一个tuple调用1次;
  •  complete:对于global aggregation来讲,每一个批次调用一次。若是使用的是partitionAggregate,则每个批次的每个partition 调用1次。对于Grouped Streams来讲,每一个相同的key组成的数据流调用1次。

 

须要特别注意的是:当使用没有group by 的Aggregator或者ReducerAggregation计算global aggretation时,每一个batch的数据流只能在1个partition(至关于storm的task)中执行,即便设置了parallelismHint的并发数n>1,实际上也只能轮循的叫不一样批次aggregation执行,也就至关于串行执行,因此反而浪费了资源。

使用aggregation作global aggregation没法启动并发,可是当配合CombinerAggregator<T>时候能够,Trident会把拓扑自动拆分红2个bolt,第一个bolt作局部聚合,相似于Hadoop中的map;第二个bolt经过接收网络传输过来的局部聚合值最后作一个全局聚合。自动优化后的第一个bolt是本地化操做,所以它能够和它前面或者后面挨着的全部each合并在同一个bolt里面。

 

 

trident.newStream(“trident_spout”, new MySpout())  
	.partitionAggregate(new MyAggregator(), new Fields(“testoutput1”))  
	.parallelismHint(5)  
	.aggregate(new Fields(“out1”), new MyAggregator(), new Fields(“testoutput2”));


parallelismHint(n)要写在aggregate的前面,若是写在aggregate后面,将致使本地化操做的第一个bolt的并发度为1,而全局聚合的第二个bolt的并发度为n,而实际上第二个bolt并不能真正开启并发,只是前面提到的轮循而已。

把parallelismHint(n)写在aggregate的前面会致使spout同时开启n的并发度,所以要注意本身实现的spout类是否支持并发发送。

 

 下面是使用Aggregator来进行Count的一个实现

 

public class CountAgg extends BaseAggregator<CountState> {
    static class CountState {
        long count = 0;
    }

    public CountState init(Object batchId, TridentCollector collector) {
        return new CountState();
    }

    public void aggregate(CountState state, TridentTuple tuple, TridentCollector collector) {
        state.count+=1;
    }

    public void complete(CountState state, TridentCollector collector) {
        collector.emit(new Values(state.count));
    }
}

某些时候你须要同时计算multiple aggregators时,使用以下的方式进行链接:

mystream.chainedAgg()
        .partitionAggregate(new Count(), new Fields("count"))
        .partitionAggregate(new Fields("b"), new Sum(), new Fields("sum"))
        .chainEnd()

 这段代码会在每一个分区上分别执行 Count 和 Sum 聚合器,而输出中只会包含一个带有 ["count", "sum"] 域的单独的 tuple。

stateQuery and partitionPersist

stateQuery 与 partitionPersist 会分别查询、更新 state 数据源。你能够参考 Trident State 文档 来了解如何使用它们。

projection

projection 方法只会保留操做中指定的域。若是你有一个带有 ["a", "b", "c", "d"] 域的数据流,经过执行这段代码:

mystream.project(new Fields("b", "d"))

就会使得输出数据流中只包含有 ["b","d"] 域。

Repartitioning operations

重分区操做会执行一个用来改变在不一样的任务间分配 tuple 的方式的函数。在重分区的过程当中分区的数量也可能会发生变化(例如,重分区以后的并行度就有可能会增大)。重分区会产生必定的网络数据传输。下面是重分区操做的几个函数:

  1. shuffle:经过随机轮询算法来从新分配目标区块的全部 tuple。
  2. broadcast:每一个 tuple 都会被复制到全部的目标区块中。这个函数在 DRPC 中颇有用 —— 好比,你可使用这个函数来获取每一个区块数据的查询结果。
  3. partitionBy:该函数会接收一组域做为参数,并根据这些域来进行分区操做。能够经过对这些域进行哈希化,并对目标分区的数量取模的方法来选取目标区块。partitionBy 函数可以保证来自同一组域的结果总会被发送到相同的目标区间。
  4. global:这种方式下全部的 tuple 都会被发送到同一个目标分区中,并且数据流中的全部的块都会由这个分区处理。
  5. batchGlobal:同一个 batch 块中的全部 tuple 会被发送到同一个区块中。固然,在数据流中的不一样区块仍然会分配到不一样的区块中。
  6. partition:这个函数使用自定义的分区方法,该方法会实现 org.apache.storm.grouping.CustomStreamGrouping 接口。

Aggregation operations

Trident 使用 aggregate 方法和 persistentAggregate 方法来对数据流进行聚类操做。其中,aggregate 方法会分别对数据流中的每一个 batch 进行处理,而 persistentAggregate 方法则会对数据流中的全部 batch 执行聚类处理,并将结果存入某个 state 中。

在数据流上执行 aggregate 方法会执行一个全局的聚类操做。在你使用 ReducerAggregator 或者 Aggregator 时,数据流首先会被从新分区成一个单独的分区,而后聚类函数就会在该分区上执行操做。而在你使用 CombinerAggregator 时,Trident 首先会计算每一个分区的部分聚类结果,而后将这些结果重分区到一个单独的分区中,最后在网络数据传输完成以后结束这个聚类过程。CombinerAggregator 比其余的聚合器的运行效率更高,在聚类时应该尽量使用CombinerAggregator

下面是一个使用 aggregate 来获取一个 batch 的全局计数值的例子:

mystream.aggregate(new Count(), new Fields("count"))

与 partitionAggregate同样,aggregate的聚合器也能够进行链式处理。然而,若是你在一个处理链中同时使用了CombinerAggregator 和non-CombinerAggregator,Trident 就不能对部分聚类操做进行优化了。

想要了解更多使用 persistentAggregate 的方法,能够参考 Trident State 文档 一文。

Operations on grouped streams

经过对指定的域执行 partitionBy 操做,groupBy 操做能够将数据流进行重分区,使得相同的域的 tuple 分组能够汇集在一块儿。例如,下面是一个 groupBy 操做的示例:

若是你在分组数据流上执行聚合操做,聚合器会在每一个分组(而不是整个区块)上运行。persistentAggregate 一样能够在一个分组数据里上运行,这种状况下聚合结果会存储在 MapState 中,其中的 key 就是分组的域名。

和其余操做同样,对分组数据流的聚合操做也能够以链式的方式执行。

Merges and joins

Trident API 的最后一部分是联结不一样的数据流的操做。联结数据流最简单的方式就是将全部的数据流融合到一个流中。你可使用 TridentTopology 的 merge 方法实现该操做,好比这样:

topology.merge(stream1, stream2, stream3);

Trident 会将融合后的新数据流的域命名为为第一个数据流的输出域。

联结数据流的另一种方法是使用 join。像 SQL 那样的标准 join 操做只能用于有限的输入数据集,对于无限的数据集就没有用武之地了。Trident 中的 join 只会应用于每一个从 spout 中输出的小 batch。

下面是两个流的 join 操做的示例,其中一个流含有 [“key”, “val1”, “val2”] 域,另一个流含有 [“x”, “val1”] 域:

topology.join(stream1, new Fields("key"), stream2, new Fields("x"), new Fields("key", "a", "b", "c"));

上面的例子会使用 "key" 和 "x" 做为 join 的域来联结 stream1 和 stream2。Trident 要求先定义好新流的输出域,由于输入流的域可能会覆盖新流的域名。从 join 中输出的 tuple 中会包含:

  1. join 域的列表。在这个例子里,输出的 "key" 域与 stream1 的 "key" 域以及 stream2 的 "x" 域对应。
  2. 来自全部流的非 join 域的列表。这个列表是按照传入 join 方法的流的顺序排列的。在这个例子里,"a" 和 "b" 域与 stream1 的 "val1" 和 "val2" 域对应;而 "c" 域则与 stream2 的 “val1” 域相对应。

在对不一样的 spout 发送出的流进行 join 时,这些 spout 上会按照他们发送 batch 的方式进行同步处理。也就是说,一个处理中的 batch 中含有每一个 spout 发送出的 tuple。

到这里你大概仍然会对如何进行窗口 join 操做感到困惑。窗口操做(包括平滑窗口、滚动窗口等 —— 译者注)主要是指将当前的 tuple 与过去若干小时时间段内的 tuple 联结起来的过程。

你可使用 partitionPersist 和 stateQuery 来实现这个过程。过去一段时间内的 tuple 会以 join 域为关键字被保存到一个 state 源中。而后就可使用 stateQuery 查询 join 域来实现这个“联结”(join)的过程。

相关文章
相关标签/搜索