Twitter Storm Stream Grouping编写自定义分组实现

##自定义Grouping测试ide

Storm是支持自定义分组的,本篇文章就是探究Storm如何编写一个自定义分组器,以及对Storm分组器如何分组数据的理解。oop

这是我写的一个自定义分组,老是把数据分到第一个Task:测试

public class MyFirstStreamGrouping implements CustomStreamGrouping {
    private static Logger log = LoggerFactory.getLogger(MyFirstStreamGrouping.class);

    private List<Integer> tasks;

    @Override
    public void prepare(WorkerTopologyContext context, GlobalStreamId stream,
		List<Integer> targetTasks) {
	    this.tasks = targetTasks;
	    log.info(tasks.toString());
    }	
    @Override
    public List<Integer> chooseTasks(int taskId, List<Object> values) {
	    log.info(values.toString());
	    return Arrays.asList(tasks.get(0));
    }
}

从上面的代码能够看出,该自定义分组会把数据归并到第一个Task<code>Arrays.asList(tasks.get(0));</code>,也就是数据到达后老是被派发到第一组。ui

测试代码:this

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("words", new TestWordSpout(), 2); 
//自定义分组,
builder.setBolt("exclaim1", new DefaultStringBolt(), 3)
	    .customGrouping("words", new MyFirstStreamGrouping());

和以前的测试用例同样,Spout老是发送<code>new String[] {“nathan”, “mike”, “jackson”, “golda”, “bertels”}</code>列表的字符串。咱们运行验证一下:线程

11878 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
11943 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [nathan]
11944 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: nathan
11979 [Thread-29-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]
11980 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike
12045 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]
12045 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
12080 [Thread-29-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [jackson]
12081 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: jackson
12145 [Thread-41-words] INFO  cn.pointways.dstorm.grouping.MyFirstStreamGrouping - [mike]
12146 [Thread-25-exclaim1] INFO  cn.pointways.dstorm.bolt.DefaultStringBolt - rev a message: mike

从这个运行日志咱们能够看出,数据老是派发到一个Blot:Thread-25-exclaim1。由于我时本地测试,Thread-25-exclaim1是线程名。而派发的线程是数据多个线程的。所以该测试符合预期,即老是发送到一个Task,而且这个Task也是第一个。日志

##理解自定义分组实现code

本身实现一个自定义分组难吗?其实若是你理解了Hadoop的Partitioner,Storm的CustomStreamGrouping和它也是同样的道理。orm

Hadoop MapReduce的Map完成后会把Map的中间结果写入磁盘,在写磁盘前,线程首先根据数据最终要传送到的Reducer把数据划分红相应的分区,而后不一样的分区进入不一样的Reduce。咱们先来看看Hadoop是怎样把数据怎样分组的,这是Partitioner惟一一个方法:字符串

public class Partitioner<K, V> {
    @Override
    public int getPartition(K key, V value, int numReduceTasks) {
        return 0;
    }
}

上面的代码中:Map输出的数据都会通过getPartition()方法,用来肯定下一步的分组。numReduceTasks是一个Job的Reduce数量,而返回值就是肯定该条数据进入哪一个Reduce。返回值必须大于等于0,小于numReduceTasks,不然就会报错。返回0就意味着这条数据进入第一个Reduce。对于随机分组来讲,这个方法能够这么实现:

public int getPartition(K key, V value, int numReduceTasks) {
    return hash(key) % numReduceTasks;
}

其实Hadoop 默认的Hash分组策略也正是这么实现的。这样好处是,数据在整个集群基本上是负载平衡的。

搞通了Hadoop的Partitioner,咱们来看看Storm的CustomStreamGrouping。

这是CustomStreamGrouping类的源码:

public interface CustomStreamGrouping extends Serializable {

   void prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);

   List<Integer> chooseTasks(int taskId, List<Object> values); 
}

如出一辙的道理,targetTasks就是Storm运行时告诉你,当前有几个目标Task能够选择,每个都给编上了数字编号。而 <code> chooseTasks(int taskId, List<Object> values); </code> 就是让你选择,你的这条数据values,是要哪几个目标Task处理?

如上文文章开头的自定义分组器实现的代码,我选择的老是让第一个Task来处理数据,<code> return Arrays.asList(tasks.get(0)); </code> 。和Hadoop不一样的是,Storm容许一条数据被多个Task处理,所以返回值是List<Integer>.就是让你来在提供的 'List<Integer> targetTasks' Task中选择任意的几个(必须至少是一个)Task来处理数据。

由此,Storm的自定义分组策略也就不那么麻烦了吧?

相关文章
相关标签/搜索