Storm Trident 详细介绍

1、概要
1.1 Storm(简介)
     Storm是一个实时的可靠地分布式流计算框架。
     具体就很少说了,举个例子,它的一个典型的大数据实时计算应用场景:从Kafka消息队列读取消息(能够是logs,clicks,sensor data);经过Storm对消息进行计算聚合等预处理;把处理结果持久化到NoSQL数据库或者HDFS作进一步深刻分析。
1.2 Trident(简介)
     Trident是对Storm的更高一层的抽象,除了提供一套简单易用的流数据处理API以外,它以batch(一组tuples)为单位进行处理,这样一来,可使得一些处理更简单和高效。
     咱们知道把Bolt的运行状态仅仅保存在内存中是不可靠的,若是一个node挂掉,那么这个node上的任务就会被从新分配,可是以前的状态是没法恢复的。所以,比较聪明的方式就是把storm的计算状态信息持久化到database中,基于这一点,trident就变得尤其重要。由于在处理大数据时,咱们在与database打交道时一般会采用批处理的方式来避免给它带来压力,而trident偏偏是以 batch groups 的形式处理数据,并提供了一些聚合功能的API。

2、Trident API 实践
     Trident其实就是一套API,但现阶段网上关于Trident API中各个函数的用法含义资料很少,下面我就根据一些英文资料和本身的理解,详细介绍一下Trident API各个函数的用法和含义。
2.1 each() 方法
     做用:操做batch中的每个tuple内容,通常与Filter或者Function函数配合使用。
     下面经过一个例子来介绍each()方法,假设咱们有一个FakeTweetsBatchSpout,它会模拟一个Stream,随机产生一个个消息。咱们能够经过设置这个Spout类的构造参数来改变这个Spout的batch Size的大小。
2.1.1 Filter类:过滤tuple
     一个经过actor字段过滤消息的Filter:
public static class PerActorTweetsFilter extends BaseFilter {
    String actor;
    public PerActorTweetsFilter(String actor) {
        this.actor = actor;
    }
    @Override
    public boolean isKeep(TridentTuple tuple) {
        return tuple.getString(0).equals(actor);
    }
}

   Topology:    java

topology.newStream("spout", spout)
	.each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
	.each(new Fields("actor", "text"), new Utils.PrintFilter());

     从上面例子看到,each()方法有一些构造参数: 
第一个构造参数:做为Field Selector,一个tuple可能有不少字段,经过设置Field,咱们能够隐藏其它字段,仅仅接收指定的字段(其它字段实际还在)。  node

第二个是一个Filter:用来过滤掉除actor名叫"dave"外的其它消息。

2.1.2 Function类:加工处理tuple内容
     一个能把tuple中text内容变成大写的Function:
public static class UppercaseFunction extends BaseFunction {
    @Override
    public void execute(TridentTuple tuple, TridentCollector collector) {
        collector.emit(new Values(tuple.getString(0).toUpperCase()));
    }
}

     Topology:  算法

topology.newStream("spout", spout)
	.each(new Fields("actor", "text"), new PerActorTweetsFilter("dave"))
	.each(new Fields("text", "actor"), new UppercaseFunction(), new Fields("uppercased_text"))
	.each(new Fields("actor", "text", "uppercased_text"), new Utils.PrintFilter());

    首先,UppercaseFunction函数的输入是Fields("text", "actor"),其做用是把其中的"text"字段内容都变成大写。  数据库

  其次,它比Filter多出一个输出字段,做用是每一个tuple在通过这个Function函数处理后,输出字段都会被追加到tuple后面,在本例中,执行完Function以后的tuple内容多了一个"uppercased_text",而且这个字段排在最后面。

2.1.3 Field Selector与project
   咱们须要注意的是,上面每一个each()方法的第一个Field字段仅仅是隐藏掉没有指定的字段内容,实际上被隐藏的字段依然还在tuple中,若是想要完全丢掉它们,咱们就须要用到project()方法。
   投影操做做用是仅保留Stream指定字段的数据,好比有一个Stream包含以下字段: [“a”, “b”, “c”, “d”],运行以下代码:
mystream.project(new Fields("b", "d"))  则输出的流仅包含 [“b”, “d”]字段。

2.2 parallelismHint()方法和partitionBy()
2.2.1 parallelismHint()
     指定Topology的并行度,即用多少线程执行这个任务。咱们能够稍微改一下咱们的Filter,经过打印当前任务的partitionIndex来区分当前是哪一个线程。
Filter:
public static class PerActorTweetsFilter extends BaseFilter {
    private int partitionIndex;
    private String actor;

    public PerActorTweetsFilter(String actor) {
        this.actor = actor;
    }

    @Override
    public void prepare(Map conf, TridentOperationContext context) {
        this.partitionIndex = context.getPartitionIndex();
    }

    @Override
    public boolean isKeep(TridentTuple tuple) {
        boolean filter = tuple.getString(0).equals(actor);

        if (filter) {
            System.err.println("I am partition [" + partitionIndex + "] and I have kept a tweet by: " + actor);
        }

        return filter;
    }
}

Topology:  网络

topology.newStream("spout", spout)
	.each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")).parallelismHint(5)
	.each(new Fields("actor", "text"), new Utils.PrintFilter());

     若是咱们指定执行Filter任务的线程数量为5,那么最终的执行结果会如何呢?看一下咱们的测试结果:  框架

I am partition [4] and I have kept a tweet by: dave
I am partition [3] and I have kept a tweet by: dave
I am partition [0] and I have kept a tweet by: dave
I am partition [2] and I have kept a tweet by: dave
I am partition [1] and I have kept a tweet by: dave
     咱们能够很清楚的发现,一共有5个线程在执行Filter。
     若是咱们想要2个Spout和5个Filter怎么办呢?以下面代码所示,实现很简单。
topology.newStream("spout", spout).parallelismHint(2).shuffle()
	.each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")).parallelismHint(5)
	.each(new Fields("actor", "text"), new Utils.PrintFilter());

2.2.2 partitionBy()和重定向操做(repartitioning operation)  分布式

     咱们注意到上面的例子中用到了shuffle(),shuffle()是一个重定向操做。那什么是重定向操做呢?重定向定义了咱们的tuple如何被route到下一处理层,固然不一样的层之间可能会有不一样的并行度,shuffle()的做用是把tuple随机的route下一层的线程中,而partitionBy()则根据咱们的指定字段按照一致性哈希算法route到下一层的线程中,也就是说,若是咱们用partitionBy()的话,同一个字段名的tuple会被route到同一个线程中。
     好比,若是咱们把上面代码中的shuffle()改为partitionBy(new Fields("actor")),猜一下结果会怎样?
I am partition [2] and I have kept a tweet by: dave
I am partition [2] and I have kept a tweet by: dave
I am partition [2] and I have kept a tweet by: dave
I am partition [2] and I have kept a tweet by: dave
     测试结果正如咱们上面描述的那样,相同字段的tuple被route到了同一个partition中。
重定向操做有以下几种:
shuffle:经过随机分配算法来均衡tuple到各个分区
broadcast:每一个tuple都被广播到全部的分区,这种方式在drcp时很是有用,好比在每一个分区上作stateQuery
partitionBy:根据指定的字段列表进行划分,具体作法是用指定字段列表的hash值对分区个数作取模运算,确保相同字段列表的数据被划分到同一个分区
global:全部的tuple都被发送到一个分区,这个分区用来处理整个Stream
batchGlobal:一个Batch中的全部tuple都被发送到同一个分区,不一样的Batch会去往不一样的分区
Partition:经过一个自定义的分区函数来进行分区,这个自定义函数实现了 backtype.storm.grouping.CustomStreamGrouping

2.3 聚合(Aggregation)     
     咱们前面讲过,Trident的一个很重要的特色就是它是以batch的形式处理tuple的。咱们能够很容易想到的针对一个batch的最基本操做应该就是聚合。Trident提供了聚合API来处理batches,来看一个例子:

2.3.1 Aggregator:
public static class LocationAggregator extends BaseAggregator<Map<String, Integer>> {
    @Override
    public Map<String, Integer> init(Object batchId, TridentCollector collector) {
        return new HashMap<String, Integer>();
    }

    @Override
    public void aggregate(Map<String, Integer> val, TridentTuple tuple,
        TridentCollector collector) {
        String location = tuple.getString(0);
        val.put(location, MapUtils.getInteger(val, location, 0) + 1);
    }

    @Override
    public void complete(Map<String, Integer> val, TridentCollector collector) {
        collector.emit(new Values(val));
    }
}

Topology:  ide

topology.newStream("spout", spout)
	.aggregate(new Fields("location"), new LocationAggregator(), new Fields("location_counts"))
	.each(new Fields("location_counts"), new Utils.PrintFilter());

     这个aggregator很简单:计算每个batch的location的数量。经过这个例子咱们能够看到Aggregator接口:  函数

init(): 当刚开始接收到一个batch时执行
aggregate(): 在接收到batch中的每个tuple时执行
complete(): 在一个batch的结束时执行     

     咱们前面讲过aggregate()方法是一个重定向方法,由于它会随机启动一个单独的线程来进行这个聚合操做。
     下面咱们来看一下测试结果:
[{USA=3, Spain=1, UK=1}]
[{USA=3, Spain=2}]
[{France=1, USA=4}]
[{USA=4, Spain=1}]
[{USA=5}]
     咱们能够看到打印的结果,其中每一条的和都是5,这是由于咱们的Spout的每一个batch中tuple数量设置的是5,因此每一个线程的计算结果也会是5。 除此以外,Trident还提供了其它两个Aggregator接口: CombinerAggregator, ReducerAggregator,具体使用方法请参考Trident API。

2.3.2 partitionAggregate():
     若是咱们将上面的Topology稍微改造一下,猜一下结果会是如何?
topology.newStream("spout", spout)
	.partitionBy(new Fields("location"))
	.partitionAggregate(new Fields("location"), new LocationAggregator(), new Fields("location_counts")).parallelismHint(3)
	.each(new Fields("location_counts"), new Utils.PrintFilter());

     咱们一块儿来分析一下,首先partitionBy()方法将tuples按其location字段重定向到下一处理逻辑,并且相同location字段的tuple必定会被分配到同一个线程中处理。其次,partitionAggregate()方法,注意它与Aggregate不一样,它不是一个重定向方法,它仅仅是对当前partition上的各个batch执行聚合操做。由于咱们根据location进行了重定向操做,测试数据一共有4个location,而当前一共有3个partition,所以能够猜想咱们的最终测试结果中,有一个partition会处理两个location的batch,最终测试结果以下:  测试

[{France=10, Spain=5}]
[{USA=63}]
[{UK=22}]
     须要注意的是,partitionAggregate虽然也是聚合操做,但与上面的Aggregate彻底不一样,它不是一个重定向操做。

2.4 groupBy
     咱们能够看到上面几个例子的测试结果,其实咱们一般想要的是每一个location的数量是多少,那该怎么处理呢?看下面这个Topology:
topology.newStream("spout", spout)
	.groupBy(new Fields("location"))
	.aggregate(new Fields("location"), new Count(), new Fields("count"))
	.each(new Fields("location", "count"), new Utils.PrintFilter());

     咱们先看一下执行的结果: 

[France, 25]
[UK, 2]
[USA, 25]
[Spain, 44]
[France, 26]
[UK, 3]
     上面这段代码计算出了每一个location的数量,即便咱们的Count函数没有指定并行度。这就是groupBy()起的做用,它会根据指定的字段建立一个GroupedStream,相同字段的tuple都会被重定向到一块儿,汇聚成一个group。groupBy()以后是aggregate,与以前的聚合整个batch不一样,此时的aggregate会单独聚合每一个group。咱们也能够这么认为,groupBy会把Stream按照指定字段分红一个个stream group,每一个group就像一个batch同样被处理。
     不过须要注意的是,groupBy()自己并非一个重定向操做,但若是它后面跟的是aggregator的话就是,跟的是partitionAggregate的话就不是。
3、总结       Storm是一个实时流计算框架,Trident是对storm的一个更高层次的抽象,Trident最大的特色以batch的形式处理stream。      一些最基本的操做函数有Filter、Function,Filter能够过滤掉tuple,Function能够修改tuple内容,输出0或多个tuple,并能把新增的字段追加到tuple后面。      聚合有partitionAggregate和Aggregator接口。partitionAggregate对当前partition中的tuple进行聚合,它不是重定向操做。Aggregator有三个接口:CombinerAggregator, ReducerAggregator,Aggregator,它们属于重定向操做,它们会把stream重定向到一个partition中进行聚合操做。      重定向操做会改变数据流向,但不会改变数据内容,重定向操会产生网络传输,可能影响一部分效率。而Filter、Function、partitionAggregate则属于本地操做,不会产生网络传输。      GroupBy会根据指定字段,把整个stream切分红一个个grouped stream,若是在grouped stream上作聚合操做,那么聚合就会发生在这些grouped stream上而不是整个batch。若是groupBy后面跟的是aggregator,则是聚合操做,若是跟的是partitionAggregate,则不是聚合操做。
相关文章
相关标签/搜索