上一篇【 storm开发环境搭建 】 博文链接:https://my.oschina.net/u/2342969/blog/878765java
本篇会深刻理解Streams,欢迎同志(此同志非彼同志)们经过私信/评论等方式共同窗习了解.git
Streams是storm中一个核心的概念,它是在分布式并行处理和建立的无限序列元组,Streams经过给流元组中字段命名来定义,默认状况下,元组能够包含整型,长整型,短整型,字节,字符串,布尔型,双精度浮点型,单精度浮点型,字节数组,也能够自定义序列化类型。github
下面共同窗习一下 Tuple(元组)、OutputFieldsDeclarer、 元组中动态类型以及序列器apache
Tuple是storm中主要的数据结构,是storm中使用的基本单元、元组。元组是一个值列表,其中,每一个值能够是任意类型。动态类型的元组不须要被定义,元组有相似 getInteger 和getString的帮助方法,无须手动转换结果类型。数组
storm须要知道如何序列化全部的值,默认状况下,storm知道如何序列化简单类型,好比字符串、字节数组,若是想使用复杂的对象,则须要注册实现一个该类型的序列器。安全
在storm中tuple接口集成了Iuple接口 ,实现类为TupleImpl。数据结构
tuple的数据结构相似于map的键值对,其中键定义在OutputFieldsDeclarer方法的Fields对象中。分布式
经过如下例子,能够帮助你们更好的理解:ide
//例2-2 src/main/java/bolts/WordNormalizer.java package bolts; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import java.util.ArrayList; import java.util.List; import java.util.Map; public class WordNormalizer implements IRichBolt { private OutputCollector collector; public void cleanup(){} /** * *bolt*从单词文件接收到文本行,并标准化它。 * 文本行会所有转化成小写,并切分它,从中获得全部单词。 */ public void execute(Tuple input){ String sentence = input.getString(0); String[] words = sentence.split(" "); for(String word : words){ word = word.trim(); if(!word.isEmpty()){ word=word.toLowerCase(); //发布这个单词 List a = new ArrayList(); a.add(input); collector.emit(a,new Values(word)); } } //对元组作出应答 collector.ack(input); } public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector=collector; } /** * 这个*bolt*只会发布“word”域 */ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } @Override public Map<String, Object> getComponentConfiguration() { return null; } }
建立了发送一个字段(“word”)的Bolt,此时tuple的键为“word”,值为execute方法中发送的Values对象。oop
本次介绍的storm0.6.0(含)及后续版本中如何使用序列器,storm在0.6.0以前使用不一样的序列器,这里就不介绍老版本的。
tuple能够由任意类型组合而成,由于storm是分布式的,因此它须要知道在task间如何序列化和反序列化数据的。
storm使用Kryo进行序列化,Kryo是java开发中一个快速灵活序列器。默认状况下,storm能够序列化基础类型,好比字符串,字节,数组,ArrayList, HashMap, HashSet和 Clojure 集合类型,若是须要使用其余类型,须要自定义序列器。
在元组中没有对应类型的字段。在字段中放入对象和storm动态序列化数据,获得序列化接口前,咱们了解一下为何storm元组是动态类型。
增长静态类型会大大增长storm API的复杂性, Hadoop 中,使用静态类型的键和值,在使用是须要大量的注释,对于hadoop API使用以及类型安全是一个不值得的负担,动态类型使用起来会很简单。
此外,storm 元组没有一个合理的方式使用静态类型,假如一个bolt订阅了多个流,那些流中元组会有不一样类型传输在字段中。当一个bolt在execute方法接收元组,能够接收任何流的元组,就会有不少类型的元组。这样在一个bolt中,就须要为每一个类型的tuple生命不一样的方法订阅,显然,storm选择了简单方式,使用动态类型。
最后,另外使用动态类型的理由是storm能够直接被 Clojure 和 JRuby 这类动态类型的语言使用。
综上所述,storm 使用Kryo 做为序列器。为了实现自定义序列器,就须要用Kryo注册一个新的序列器,
在Kryo的Github主页: https://github.com/EsotericSoftware/kryo,有更加详细的介绍,这里仅作一下简单介绍。
增长自定义序列器,须要在拓扑配置中添加“ topology.kryo.register ”属性,它能够配置一组序列器列表,每一个序列器能够选择一下两种方式之一:
例子以下:
topology.kryo.register: - com.mycompany.CustomType1 - com.mycompany.CustomType2: com.mycompany.serializer.CustomType2Serializer - com.mycompany.CustomType3
“com.mycompany.CustomType1“ 和“com.mycompany.CustomType3“ 使用“FieldsSerializer”序列化,反之,“com.mycompany.CustomType2“会使用”com.mycompany.serializer.CustomType2Serializer“ 序列化。
storm提供了在拓扑配置中注册序列器的助手,Config类调用registerSerialization方法能够将一个序列器放入配置中。其中有一个高级设置“Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS”,若是把它设置为true,storm将会忽略在类路径无有效代码的序列器,不然,storm找不到序列器,将会排除异常。当在集群中运行了不少使用了不一样序列器的拓扑,想经过“storm.yaml”文件为全部拓扑配置好序列器,它就很是有用的。
当storm遇到一个没有序列器注册的类型,它可能会使用java序列器,若是此类型也没法被java序列器序列化,storm就会报出一个错误。
须要注意的是,不管在CPU消耗方面仍是序列化对象的大小, java序列器都是很是耗费资源的。故在生产上使用拓扑的话,强烈建议使用自定义序列器。java序列器在那里,是为了容易设计新的原型。
经过设置“Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION”项为false,能够关闭java序列器。
Storm 0.7.0 能够设置特殊组件配置,固然,若是一个组件定义一个序列化,这个序列器须要对其余bolt可用,不然其余bolt将没法接收那个组件的消息。
一个拓扑被提交,一组序列器会被拓扑选择为全部组件发送消息使用,这是经过特殊组件序列化注册信息与普通组件序列化注册信息合并实现。当为同一个类注册了多个序列化时,序列器会任意选择一个。
若是两个特定组件序列器有冲突,则会强制一个特定的类作序列器,只需在拓扑特定配置定义想要的序列器,拓扑特定配置优先于序列器注册的特定组件。