public static class PerActorTweetsFilter extends BaseFilter { String actor; public PerActorTweetsFilter(String actor) { this.actor = actor; } @Override public boolean isKeep(TridentTuple tuple) { return tuple.getString(0).equals(actor); } }
Topology: java
topology.newStream("spout", spout) .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")) .each(new Fields("actor", "text"), new Utils.PrintFilter());
从上面例子看到,each()方法有一些构造参数:
第一个构造参数:做为Field Selector,一个tuple可能有不少字段,经过设置Field,咱们能够隐藏其它字段,仅仅接收指定的字段(其它字段实际还在)。 node
public static class UppercaseFunction extends BaseFunction { @Override public void execute(TridentTuple tuple, TridentCollector collector) { collector.emit(new Values(tuple.getString(0).toUpperCase())); } }
Topology: 算法
topology.newStream("spout", spout) .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")) .each(new Fields("text", "actor"), new UppercaseFunction(), new Fields("uppercased_text")) .each(new Fields("actor", "text", "uppercased_text"), new Utils.PrintFilter());
首先,UppercaseFunction函数的输入是Fields("text", "actor"),其做用是把其中的"text"字段内容都变成大写。 数据库
其次,它比Filter多出一个输出字段,做用是每一个tuple在通过这个Function函数处理后,输出字段都会被追加到tuple后面,在本例中,执行完Function以后的tuple内容多了一个"uppercased_text",而且这个字段排在最后面。public static class PerActorTweetsFilter extends BaseFilter { private int partitionIndex; private String actor; public PerActorTweetsFilter(String actor) { this.actor = actor; } @Override public void prepare(Map conf, TridentOperationContext context) { this.partitionIndex = context.getPartitionIndex(); } @Override public boolean isKeep(TridentTuple tuple) { boolean filter = tuple.getString(0).equals(actor); if (filter) { System.err.println("I am partition [" + partitionIndex + "] and I have kept a tweet by: " + actor); } return filter; } }
Topology: 网络
topology.newStream("spout", spout) .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")).parallelismHint(5) .each(new Fields("actor", "text"), new Utils.PrintFilter());
若是咱们指定执行Filter任务的线程数量为5,那么最终的执行结果会如何呢?看一下咱们的测试结果: 框架
I am partition [4] and I have kept a tweet by: davetopology.newStream("spout", spout).parallelismHint(2).shuffle() .each(new Fields("actor", "text"), new PerActorTweetsFilter("dave")).parallelismHint(5) .each(new Fields("actor", "text"), new Utils.PrintFilter());
2.2.2 partitionBy()和重定向操做(repartitioning operation) 分布式
咱们注意到上面的例子中用到了shuffle(),shuffle()是一个重定向操做。那什么是重定向操做呢?重定向定义了咱们的tuple如何被route到下一处理层,固然不一样的层之间可能会有不一样的并行度,shuffle()的做用是把tuple随机的route下一层的线程中,而partitionBy()则根据咱们的指定字段按照一致性哈希算法route到下一层的线程中,也就是说,若是咱们用partitionBy()的话,同一个字段名的tuple会被route到同一个线程中。public static class LocationAggregator extends BaseAggregator<Map<String, Integer>> { @Override public Map<String, Integer> init(Object batchId, TridentCollector collector) { return new HashMap<String, Integer>(); } @Override public void aggregate(Map<String, Integer> val, TridentTuple tuple, TridentCollector collector) { String location = tuple.getString(0); val.put(location, MapUtils.getInteger(val, location, 0) + 1); } @Override public void complete(Map<String, Integer> val, TridentCollector collector) { collector.emit(new Values(val)); } }
Topology: ide
topology.newStream("spout", spout) .aggregate(new Fields("location"), new LocationAggregator(), new Fields("location_counts")) .each(new Fields("location_counts"), new Utils.PrintFilter());
这个aggregator很简单:计算每个batch的location的数量。经过这个例子咱们能够看到Aggregator接口: 函数
init(): 当刚开始接收到一个batch时执行topology.newStream("spout", spout) .partitionBy(new Fields("location")) .partitionAggregate(new Fields("location"), new LocationAggregator(), new Fields("location_counts")).parallelismHint(3) .each(new Fields("location_counts"), new Utils.PrintFilter());
咱们一块儿来分析一下,首先partitionBy()方法将tuples按其location字段重定向到下一处理逻辑,并且相同location字段的tuple必定会被分配到同一个线程中处理。其次,partitionAggregate()方法,注意它与Aggregate不一样,它不是一个重定向方法,它仅仅是对当前partition上的各个batch执行聚合操做。由于咱们根据location进行了重定向操做,测试数据一共有4个location,而当前一共有3个partition,所以能够猜想咱们的最终测试结果中,有一个partition会处理两个location的batch,最终测试结果以下: 测试
[{France=10, Spain=5}]topology.newStream("spout", spout) .groupBy(new Fields("location")) .aggregate(new Fields("location"), new Count(), new Fields("count")) .each(new Fields("location", "count"), new Utils.PrintFilter());
咱们先看一下执行的结果:
[France, 25]