storm自定义数据分组

数据流组缓存

设计一个拓扑时,你要作的最重要的事情之一就是定义如何在各组件之间交换数据(数据流是如何被bolts消费的)。一个数据流组指定了每一个bolt会消费哪些数据流,以及如何消费它们。 ide

storm自带数据流组oop

随机数据流组ui

随机流组是最经常使用的数据流组。它只有一个参数(数据源组件),而且数据源会向随机选择的bolt发送元组,保证每一个消费者收到近似数量的元组。this

 builder.setBolt("word-counter", new WordCounter()).shuffleGrouping("word-normalizer");

 域数据流组spa

域数据流组容许你基于元组的一个或多个域控制如何把元组发送给bolts。它保证拥有相同域组合的值集发送给同一个bolt。回到单词计数器的例子,若是你用word域为数据流分组,word-normalizer bolt将只会把相同单词的元组发送给同一个word-counterbolt实例。设计

 builder.setBolt("word-counter", new WordCounter(),2)
           .fieldsGrouping("word-normalizer", new Fields("word"));

所有数据流组code

所有数据流组,为每一个接收数据的实例复制一份元组副本。这种分组方式用于向bolts发送信号。好比,你要刷新缓存,你能够向全部的bolts发送一个刷新缓存信号。在单词计数器的例子里,你可使用一个所有数据流组,添加清除计数器缓存的功能 orm

builder.setBolt("word-counter", new WordCounter(),2)
           .fieldsGroupint("word-normalizer",new Fields("word"))
           .allGrouping("signals-spout","signals");

直接数据流组接口

这是一个特殊的数据流组,数据源能够用它决定哪一个组件接收元组

 builder.setBolt("word-counter", new WordCounter(),2)
           .directGrouping("word-normalizer");

。与前面的例子相似,数据源将根据单词首字母决定由哪一个bolt接收元组。要使用直接数据流组,在WordNormalizer bolt中,使用emitDirect方法代替emit。

public void execute(Tuple input) {
        ...
        for(String word : words){
            if(!word.isEmpty()){
                ...
                collector.emitDirect(getWordCountIndex(word),new Values(word));
            }
        }
        //对元组作出应答
        collector.ack(input);
    }
    public Integer getWordCountIndex(String word) {
        word = word.trim().toUpperCase();
        if(word.isEmpty()){
            return 0;
        }else{
            return word.charAt(0) % numCounterTasks;
        }
    }

在prepare方法中计算任务数

 public void prepare(Map stormConf, TopologyContext context, 
                OutputCollector collector) {
        this.collector = collector;
        this.numCounterTasks = context.getComponentTasks("word-counter");
    }

全局数据流组

全局数据流组把全部数据源建立的元组发送给单一目标实例(即拥有最低ID的任务)。

不分组

这个数据流组至关于随机数据流组。也就是说,使用这个数据流组时,并不关心数据流是如何分组的。

自定义数据流组

storm自定义数据流组和hadoop Partitioner分组很类似,storm自定义分组要实现CustomStreamGrouping接口,接口源码以下:

public   interface   CustomStreamGrouping  extends   Serializable {
 
    void   prepare(WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks);
 
    List<Integer> chooseTasks( int   taskId, List<Object> values);
}

targetTasks就是Storm运行时告诉你,当前有几个目标Task能够选择,每个都给编上了数字编号。而 chooseTasks(int taskId, List values); 就是让你选择,你的这条数据values,是要哪几个目标Task处理?

这是我写的一个自定义分组,老是把数据分到第一个Task:

public   class   MyFirstStreamGrouping  implements   CustomStreamGrouping {
     private   static   Logger log = LoggerFactory.getLogger(MyFirstStreamGrouping. class );
 
     private   List<Integer> tasks;
 
     @Override
     public   void   prepare(WorkerTopologyContext context, GlobalStreamId stream,
         List<Integer> targetTasks) {
         this .tasks = targetTasks;
         log.info(tasks.toString());
     }  
     @Override
     public   List<Integer> chooseTasks( int   taskId, List<Object> values) {
         log.info(values.toString());
         return   Arrays.asList(tasks.get( 0 ));
     }
}

从上面的代码能够看出,该自定义分组会把数据归并到第一个TaskArrays.asList(tasks.get(0));,也就是数据到达后老是被派发到第一组。和Hadoop不一样的是,Storm容许一条数据被多个Task处理,所以返回值是List .就是让你来在提供的 'List targetTasks' Task中选择任意的几个(必须至少是一个)Task来处理数据。

第二个自定义分组,wordcount中使首字母相同的单词交给同一个bolt处理:

public class ModuleGrouping implements CustormStreamGrouping{
        int numTasks = 0;
        @Override
        public List<Integer> chooseTasks(List<Object> values) {
            List<Integer> boltIds = new ArrayList<Integer>();
            if(values.size()>0){
                String str = values.get(0).toString();
                if(str.isEmpty()){
                    boltIds.add(0);
                }else{
                    boltIds.add(str.charAt(0) % numTasks);
                }
            }
            return boltIds;
        }
        @Override
        public void prepare(TopologyContext context, Fields outFields, List<Integer> targetTasks) {
            numTasks = targetTasks.size();
        }
    }

这是一个CustomStreamGrouping的简单实现,在这里咱们采用单词首字母字符的整数值与任务数的余数,决定接收元组的bolt。

builder.setBolt("word-normalizer", new WordNormalizer())
           .customGrouping("word-reader", new ModuleGrouping());
相关文章
相关标签/搜索