Trident Tutorial(翻译)

Trident是在storm基础上,一个以realtime 计算为目标的高度抽象。 它在提供处理大吞吐量数据能力的同时,也提供了低延时分布式查询和有状态流式处理的能力。 若是你对Pig和Cascading这种高阶批量处理工具很了解的话,那么你会很容易理解Trident的概念。Tident提供了 joins, aggregations, grouping, functions, 以及 filters等能力。除此以外,Trident 还提供了一些专门的原语,从而在基于数据库或者其余存储的前提下来应付有状态的递增式处理。Trident拥有一致的、exactly-once的语义,因此很容易去推导一个Trident Topology。html

 

举例说明

让咱们一块儿来看一个关于Trident的例子。在这个例子中,主要作了两件事情:java

  1. 从一个关于sentences的输入流中读取数据并计算每一个word的个数
  2. 实现一个关于给定单词列表中每一个单词当前总数的查询功能

为了达到说明的目的,这个例子会从一个无限的输入流中读取语句做为输入:node

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
               new Values("the cow jumped over the moon"),
               new Values("the man went to the store and bought some candy"),
               new Values("four score and seven years ago"),
               new Values("how many apples can you eat"));
spout.setCycle(true);


这个spout会循环输出语句集合到sentence stream当中,下面的代码会以这个stream做为输入并计算每一个单词的个数:mysql

TridentTopology topology = new TridentTopology();        
TridentState wordCounts =
     topology.newStream("spout1", spout)
       .each(new Fields("sentence"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
       .parallelismHint(6);

 

让咱们一块儿来读一下这段代码。咱们首先建立了一个TridentTopology对象,TridentTopology类暴露接口来构造Trident计算过程当中的全部内容。TridentTopology有一个newStream方法来建立一个数据流,以供topology读取输入源。这个例子中,输入源是就是前面定义的FixedBatchSpout。输入源也能够是 Kestrel 或者Kafka的queue brokers. Trident会在Zookeeper中保存一小部分状态信息来追踪数据的处理状况(关于数据是否已经被消费的元数据)。代码中咱们指定的字符串“spout1”就是Zookeeper中用来存储metadata信息的Znode节点git

Trident会把输入流转换成若干个tuple的batch来处理。例如,输入的sentence stream可能会被拆分红以下的batch:github

通常来讲,这些小的batch中的tuple可能会在数千或者数百万这样的数量级,取决于输入的吞吐量。sql

Trident提供了一系列很是成熟的批量处理的API来处理这些小batch. 这些API和你在Pig或者Cascading中看到的很是相似, 你能够作group by's, joins, aggregations, 运行 functions, 执行 filters等等。固然,独立的处理每一个小的batch并非很是有趣的事情,因此Trident提供了不少功能来实现batch之间的聚合的结果并能够将这些聚合的结果存储到内存,Memcached, Cassandra或者是一些其余的存储中。同时,Trident还提供了很是好的功能来查询实时状态。这些实时状态能够被Trident更新,同时它也能够是一个独立的状态源。数据库

回到咱们的这个例子中来,spout输出了一个只有单一字段"sentence"的数据流。在下一行,topology使用了Split函数来拆分stream中的每个tuple,Split函数读取输入流中的"sentence"字段并将其拆分"word"。每个sentence tuple可能会被转换成多个word tuple,好比说"the cow jumped over the moon" 会被转换成6个 "word" tuples. 下面是Split的定义:apache

public class Split extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
       String sentence = tuple.getString(0);
       for(String word: sentence.split(" ")) {
           collector.emit(new Values(word));                
       }
   }
}


如你所见,真的很简单。它只是简单的根据空格拆分sentence,并将拆分出的每一个word做为一个tuple输出。并发

topology的其余部分计算单词的个数并将计算结果保存到了持久存储中。首先stream被根据"word"字段执行group操做,而后每个group使用Count聚合器进行持久化聚合。persistentAggregate会帮助你把一个状态源聚合的结果存储或者更新到存储当中。在这个例子中,单词的数量被保持在内存中,不过咱们能够很简单的把这些数据保存到其余的存储当中,如 Memcached, Cassandra等。若是咱们要把结果存储到Memcached中,只是简单的使用下面这句话替换掉persistentAggregate就能够,这当中的"serverLocations"是Memcached cluster的主机和端口号列表:

.persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count"))        
MemcachedState.transactional()

persistentAggregate存储的数据就是全部batch聚合的结果。

Trident很是酷的一点就是它是彻底容错的,exactly-one处理语义。这就让你能够很轻松的使用Trident来进行实时数据处理。Trident会把状态以某种形式保持起来,当有错误发生时,它会根据须要来恢复这些状态。

persistentAggregate方法会把数据流转换成一个TridentState对象。在这个例子当中,TridentState对象表明了全部的单词的数量。咱们会使用这个TridentState对象来实如今计算过程当中的进行分布式查询。

下面这部分实现了一个低延时的单词数量的分布式查询。这个查询以一个用空格分割的单词列表为输入,并返回这些单词当天的个数。这些查询是想普通的RPC调用那样被执行的,要说不一样的话,那就是他们在后台是并行执行的。下面是执行查询的一个例子:

DRPCClient client = new DRPCClient("drpc.server.location", 3772);
System.out.println(client.execute("words", "cat dog the man");
// prints the JSON-encoded result, e.g.: "[[5078]]"


如你所见,除了这是并发执行在storm cluster上以外,这看上去就是一个正常的RPC调用。这样的简单查询的延时一般在10毫秒左右。固然,更复杂的DRPC调用可能会使用更长的时间,尽管延时很大程度上是取决于你给计算分配了多少资源。

这个分布式查询的实现以下所示:

topology.newDRPCStream("words")
       .each(new Fields("args"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"))
       .each(new Fields("count"), new FilterNull())
       .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

咱们仍然是使用TridentTopology对象来建立DRPC stream,而且咱们将这个函数命名为“words”。这个函数名会做为第一个参数在使用DRPC Client来执行查询的时候用到。

每一个DRPC请求会被当作只有一个tuple的batch job来处理。在处理过程当中,单一tuple来表示这个请求。这个tuple包含了一个叫作“args”的字段,在这个字段中保存了客户端提供的查询参数。在这个例子中,这个参数是一个以空格分割的单词列表。

首先,咱们使用Splict功能把入参拆分红独立的单词。而后对“word” 进行group by操做,以后就可使用stateQuery来在上面代码中建立的TridentState对象上进行查询。stateQuery接受一个数据源(在这个例子中,就是咱们的topolgoy所计算的单词的个数)以及一个用于查询的函数做为输入。在这个例子中,咱们使用了MapGet函数来获取每一个单词的出现个数。因为DRPC stream是使用跟TridentState彻底一样的group方式(按照“word”字段进行group),每一个单词的查询会被路由到TridentState对象管理和更新这个单词的分区去执行。

接下来,咱们用FilterNull这个过滤器把从未出现过的单词给去掉,并使用Sum这个聚合器将这些count累加起来。最终,Trident会自动把这个结果发送回等待的客户端。

Trident在如何最大程度的保证执行topogloy性能方面是很是智能的。在topology中会自动的发生两件很是有意思的事情:

  1. 读取和更新状态的操做 (好比说 persistentAggregate 和 stateQuery) 会自动的是batch的形式操做状态。 若是有20次更新须要被同步到存储中,Trident会自动的把这些操做汇总到一块儿,只作一次读一次写,而不是进行20次读20次写的操做。所以你能够在很方便的执行计算的同时,保证了很是好的性能。
  2. Trident的聚合器已是被优化的很是好了的。Trident并非简单的把一个group中全部的tuples都发送到同一个机器上面进行聚合,而是在发送以前已经进行过一次部分的聚合。打个比方,Count聚合器会先在每一个partition上面进行count,而后把每一个分片count汇总到一块儿就获得了最终的count。这个技术其实就跟MapReduce里面的combiner是一个思想。

让咱们再来看一下Trident的另一个例子。

Reach

下一个例子是一个纯粹的DRPC topology。这个topology会计算一个给定URL的reach。那么什么是reach呢,这里咱们将reach定义为有多少个独立用户在Twitter上面expose了一个给定的URL。要计算reach,你须要获取曾经tweet过这个URL的全部人,而后找到全部follow这些人的人,并将这些follower去重,最后就获得了去重后的follower的数量。若是把计算reach的整个过程都放在一台机器上面,就太勉强了,由于这会须要进行数千次数据库调用以及上亿次的tuple的读取。若是使用Storm和Trident,你就能够把这些计算步骤在整个cluster中进行并发执行。

这个topology会读取两个state源。一个用来保存URL以及tweet这个URL的人的关系的数据库。还有一个保持人和他的follower的关系数据库。topology的定义以下:

TridentState urlToTweeters =
       topology.newStaticState(getUrlToTweetersState());
TridentState tweetersToFollowers =
       topology.newStaticState(getTweeterToFollowersState());
topology.newDRPCStream("reach")
       .stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters"))
       .each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
       .shuffle()
       .stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))
       .parallelismHint(200)
       .each(new Fields("followers"), new ExpandList(), new Fields("follower"))
       .groupBy(new Fields("follower"))
       .aggregate(new One(), new Fields("one"))
       .parallelismHint(20)
       .aggregate(new Count(), new Fields("reach"));

 

这个topology使用newStaticState方法建立了TridentState对象来表明一种外部存储。使用这个TridentState对象,咱们就能够在这个topology上面进行动态查询了。和全部的状态源同样,在数据库上面的查询会自动被批量执行,从而最大程度的提高效率。

这个topology的定义是很是直观的 - 只是一个简单的批量处理job。首先,查询urlToTweeters数据库来获得tweet过这个URL的人员列表。这个查询会返回一个列表,所以咱们使用ExpandList函数来把每个反悔的tweeter转换成一个tuple。

接下来,咱们来获取每一个tweeter的follower。咱们使用shuffle来把要处理的tweeter分布到toplology运行的每个worker中并发去处理。而后查询follower数据库从而的到每一个tweeter的follower。你能够看到咱们为topology的这部分分配了很大的并行度,这是由于这部分是整个topology中最耗资源的计算部分。

而后咱们在follower上面使用group by操做进行分组,并对每一个组使用一个聚合器。这个聚合器只是简单的针对每一个组输出一个tuple “One”,再count “One” 从而的到不一样的follower的数量。“One”聚合器的定义以下:

public class One implements CombinerAggregator<Integer> {
   public Integer init(TridentTuple tuple) {
       return 1;
   }
   public Integer combine(Integer val1, Integer val2) {
       return 1;
   }
   public Integer zero() {
       return 1;
   }        
}

这是一个"汇总聚合器", 它会在传送结果到其余worker汇总以前进行局部汇总,从而来最大程度上提高性能。Sum也是一个汇总聚合器,所以以Sum做为topology的最终操做是很是高效的。

接下来让咱们一块儿来看看Trident的一些细节。

Fields and tuples

Trident的数据模型就是TridentTuple - 一个有名的值的列表。在一个topology中,tuple是在一系列的处理操做(operation)中增量生成的。operation通常以一组子弹做为输入并输出一组功能字段。Operation的输入字段常常是输入tuple的一个子集,而功能字段则是operation的输出。

看一下以下这个例子。假定你有一个叫作“stream”的stream,它包含了“x”,"y"和"z"三个字段。为了运行一个读取“y”做为输入的过滤器MyFilter,你能够这样写:

stream.each(new Fields("y"), new MyFilter())

假定MyFilter的实现是这样的:

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
       return tuple.getInteger(0) < 10;
   }
}
 

这会保留全部“y”字段小于10的tuples。TridentTuple传个MyFilter的输入将只有字段“y”。这里须要注意的是,当选择输入字段时,Trident会自动投影tuple的一个子集,这个操做是很是高效的。

让咱们一块儿看一下“功能字段”是怎样工做的。假定你有以下这个功能:

public class AddAndMultiply extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
       int i1 = tuple.getInteger(0);
       int i2 = tuple.getInteger(1);
       collector.emit(new Values(i1 + i2, i1 * i2));
   }
}


这个函数接收两个数做为输入并输出两个新的值:“和”和“乘积”。假定你有一个stream,其中包含“x”,"y"和"z"三个字段。你能够这样使用这个函数:

stream.each(new Fields("x", "y"), new AddAndMultiply(), new Fields("added", "multiplied"));

 

输出的功能字段被添加到输入tuple中。所以这个时候,每一个tuple中将会有5个字段"x", "y", "z", "added", 和 "multiplied". "added" 和"multiplied"对应于AddAndMultiply输出的第一和第二个字段。

另外,咱们可使用聚合器来用输出字段来替换输入tuple。若是你有一个stream包含字段"val1"和"val2",你能够这样作:

stream.aggregate(new Fields("val2"), new Sum(), new Fields("sum"))

 

output stream将会只包含一个叫作“sum”的字段,这个sum字段就是“val2”的累积和。

在group以后的stream上,输出将会是被group的字段以及聚合器输出的字段。举例以下:

stream.groupBy(new Fields("val1")).aggregate(new Fields("val2"), new Sum(), new Fields("sum"))
在这个例子中,输出将包含字段"val1" 和 "sum".

State

在实时计算领域的一个主要问题就是怎么样来管理状态并能轻松应对错误和重试。消除错误的影响是很是重要的,由于当一个节点死掉,或者一些其余的问题出现时,那行batch须要被从新处理。问题是-你怎样作状态更新来保证每个消息被处理且只被处理一次?

这是一个很棘手的问题,咱们能够用接下来的例子进一步说明。假定你在作一个你的stream的计数聚合,而且你想要存储运行时的count到一个数据库中去。若是你只是存储这个count到数据库中,而且想要进行一次更新,咱们是没有办法知道一样的状态是否是之前已经被update过了的。此次更新可能在以前就尝试过,而且已经成功的更新到了数据库中,不过在后续的步骤中失败了。还有多是在上次更新数据库的过程当中失败的,这些你都不知道。

Trident经过作下面两件事情来解决这个问题:

  1. 每个batch被赋予一个惟一标识id“transaction id”。若是一个batch被重试,它将会拥有和以前一样的transaction id
  2. 状态更新是以batch为单位有序进行的。也就是说,batch 3的状态更新必须等到batch 2的状态更新成功以后才能够进行。

有了这2个原则,你就能够达到有且只有一次更新的目标。你能够将transaction id和count一块儿以原子的方式存到数据库中。当更新一个count的时候,须要判断数据库中当前batch的transaction id。若是跟要更新的transaction id同样,就跳过此次更新。若是不一样,就更新这个count。

固然,你不须要在topology中手动处理这些逻辑。这些逻辑已经被封装在Stage的抽象中并自动进行。你的Stage object也不须要本身去实现transaction id的跟踪操做。若是你想了解更多的关于如何实现一个Stage以及在容错过程当中的一些取舍问题,能够参照in this doc.

一个Stage能够采用任何策略来存储状态。它能够存储到一个外部的数据库,也能够在内存中保持状态并备份到HDFS中。Stage并不须要永久的保持状态。好比说,你有一个内存版的Stage实现,它保存最近X个小时的数据并丢弃老的数据。能够把Memcached integration 做为例子来看看State的实现.

Execution of Trident topologies

Trident的topology会被编译成尽量高效的Storm topology。只有在须要对数据进行repartition的时候(如groupby或者shuffle)才会把tuple经过network发送出去,若是你有一个trident以下:

它将会被编译成以下的storm topology:

Conclusion

Trident使得实时计算更加优雅。你已经看到了如何使用Trident的API来完成大吞吐量的流式计算,状态维护,低延时查询等等功能。Trident让你在获取最大性能的同时,以更天然的一种方式进行实时计算。

相关文章
相关标签/搜索