##Storm Groupingapp
shuffleGrouping负载均衡
将流分组定义为混排。这种混排分组意味着来自Spout的输入将混排,或随机分发给此Bolt中的任务。shuffle grouping对各个task的tuple分配的比较均匀。学习
fieldsGrouping测试
这种grouping机制保证相同field值的tuple会去同一个task,这对于WordCount来讲很是关键,若是同一个单词不去同一个task,那么统计出来的单词次数就不对了。ui
All grouping线程
广播发送, 对于每个tuple将会复制到每个bolt中处理。日志
Global groupingcode
Stream中的全部的tuple都会发送给同一个bolt任务处理,全部的tuple将会发送给拥有最小task_id的bolt任务处理。orm
None grouping字符串
不关注并行处理负载均衡策略时使用该方式,目前等同于shuffle grouping,另外storm将会把bolt任务和他的上游提供数据的任务安排在同一个线程下。
Direct grouping
由tuple的发射单元直接决定tuple将发射给那个bolt,通常状况下是由接收tuple的bolt决定接收哪一个bolt发射的Tuple。这是一种比较特别的分组方法,用这种分组意味着消息的发送者指定由消息接收者的哪一个task处理这个消息。 只有被声明为Direct Stream的消息流能够声明这种分组方法。并且这种消息tuple必须使用emitDirect方法来发射。消息处理者能够经过TopologyContext来获取处理它的消息的taskid (OutputCollector.emit方法也会返回taskid)
##fieldsGrouping
若是你了解Storm,我想你能明白其中的大多数Grouping。这里的Grouping策略我想着重介绍一下fieldsGrouping,也最难理解的。
fieldsGrouping是按照数据中字段Field的值分组的。下面是个人测试代码:
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("words", new TestWordSpout(), 2); builder.setBolt("exclaim2", new DefaultStringBolt(), 5) .fieldsGrouping("words", new Fields("word"));
测试的例子Spout是Storm自带的例子,Blot代码以下:
public void execute(Tuple tuple) { log.info("rev a message: " + tuple.getString(0)); collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); collector.ack(tuple); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); }
Storm自带的例子Spout能随机的返回<code>new String[] {"nathan", "mike", "jackson", "golda", "bertels"};</code>列表中的几个字符串。这也是测试FieldGroup的好例子。
按照我最先作Storm开始前的理解,既然是按照Field分组,那么是全部相同的Field值得数据都会到达一个Blot的。我测试不少次,其结果并非这样,一个Blot会收到多个不一样的值。我没有仔细探究Storm这样分组有什么特别的地方,以致于本身对Storm的学习停滞了很长时间。
Storm能保证全部相同Field值的数据到达的是相同的Blot,可是不保证一个Blot只处理一个值域。
也就是说,全部值是nathan能到达到一个Blot,可是到达同一个Blot的值可能有多个,如"nathan", "mike"的数据都到达。
理解到这点上,fieldsGrouping就算是理解了。
下面是测试日志:
9144 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels 9234 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike 9245 [Thread-33-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 9335 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda 9346 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda 9437 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 9447 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike 9537 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda 9548 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 9639 [Thread-33-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 9649 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 9740 [Thread-33-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan 9749 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson 9841 [Thread-35-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: bertels 9850 [Thread-26-exclaim2] INFO cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: golda
由上面的日志能够看出,golda这个值的数据,的确归并到一个Blot处理的。线程编号:Thread-26-exclaim2。 其它值也都是相同值都是在一个线程内被处理的。