Storm 中的 tuple能够包含任何类型的对象。因为Storm 是一个分布式系统,因此在不一样的任务之间传递消息时Storm必须知道怎样序列化、反序列化消息对象。html
Storm 使用 Kryo库对对象进行序列化。Kryo 是一个灵活、快速的序列化库。Storm 默认支持基础类型、string、byte arrays、ArrayList、HashMap、HashSet 以及 Clojure 的集合类型的序列化。若是须要在tuple中使用其余的对象类型,就须要注册一个自定义的序列化器。java
原文和做者一块儿讨论:http://www.cnblogs.com/intsmaze/p/7044042.html程序员
微信:intsmaze微信
避免微信回复重复咨询问题,技术咨询请博客留言。网络
TORM使用Kryo来序列化。要实现自定义序列化器,咱们须要使用Kryo注册新的序列化器。添加自定义序列化器是经过拓扑配置的topology.kryo.register属性完成的。它须要一个注册的列表,每一个注册项能够采起两种形式:架构
1:类名注册,在这种状况下,Storm将使用Kryo的FieldsSerializer来序列化该类。这多是也可能不是该类最好的选择,更多的细节能够查看Kryo文档。分布式
2:实现了com.esotericsoftware.kryo.Serializer接口的类名注册的映射。ide
Storm为拓扑配置里的注册序列化提供了帮助。Config类中有一个名为registerSerialization的方法,能够把注册添加到配置中。性能
public void registerSerialization(Class klass); public void registerSerialization(Class klass, Class<? extends Serializer> serializerClass);
一个拓扑中不一样的任务传递消息时Storm发现了一个没有注册序列化器的类型,它会使用 Java 序列化器来代替,若是这个对象没法被Java序列化器序列化,Storm 就会抛出异常。测试
注意,Java 自身的序列化机制很是耗费资源,并且无论在 CPU 的性能上仍是在序列化对象的大小上都没有优点。强烈建议读者在生产环境中运行topology 的时候注册一个自定义的序列化器。
能够经过将 Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION 配置为 false 的方式来将禁止序列化器回退到Java的序列化机制。
Config.setFallBackOnJavaSerialization(conf,false);这个时候若是storm使用java序列化就会抛出异常告诉开发人员去注册一个kryo序列化。
建立传输的对象。
package cn.intsmaze.serializable.bean; public class Person { private int age; private Studnet studnet; private ArrayList arrayList=new ArrayList(); private LinkedList linkedList=new LinkedList(); public Person() { } public Person(int age,Studnet s) { this.age = age; this.studnet=s; arrayList.add("ArrayList中的"+s.getName()); linkedList.add("linkedList中的"+s.getName()); } @Override public String toString() { return "Person [age=" + age + ", studnet=" + studnet + ", arrayList=" + arrayList + ", linkedList=" + linkedList + "]"; } get(),set()...... } package cn.intsmaze.serializable.bean; public class Studnet { private String name; public Studnet() { } public Studnet(String name) { this.name = name; } @Override public String toString() { return "Studnet [name=" + name + "]"; } get(),set()...... }
spout和bolt的实现,spout每次会建立一个person对象将该对象发送到bolt,bolt类接收到该对象将该对象打印出来。
package cn.intsmaze.serializable; import cn.intsmaze.serializable.bean.Person; import cn.intsmaze.serializable.bean.Studnet; public class SpoutBean extends BaseRichSpout { SpoutOutputCollector collector; public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; } public void nextTuple() { Studnet s=new Studnet("xiaoxi"); collector.emit(new Values(new Person(100,s))); Utils.sleep(500); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("person")); } } package cn.intsmaze.serializable; import cn.intsmaze.serializable.bean.Person; public class BoltBean extends BaseBasicBolt { public void prepare(Map stormConf, TopologyContext context) { super.prepare(stormConf, context); } public void execute(Tuple input, BasicOutputCollector collector) { Person person = (Person)input.getValueByField("person"); System.out.println("接收到spout节点传来的数据:"+person); } public void declareOutputFields(OutputFieldsDeclarer declarer) { } }
使用public void registerSerialization(Class klass);
package cn.intsmaze.serializable; import java.util.LinkedList; import cn.intsmaze.serializable.bean.Person; import cn.intsmaze.serializable.bean.Studnet; public class TopologyBean { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new SpoutBean(), 1); builder.setBolt("bolt", new BoltBean(), 1).shuffleGrouping("spout"); Config conf = new Config(); conf.registerSerialization(Person.class); conf.registerSerialization(Studnet.class); //注释掉后,但Studnet没实现java序列化,则会报错。有两种方法,一种注册该类,一种实现java序列化。 conf.registerSerialization(LinkedList.class); //这里若是注释掉,则会使用java序列化方式,若是咱们取消掉禁止使用java序列化方法,则会提示注册LinkedList类报错。 conf.setNumWorkers(2); // Config.setFallBackOnJavaSerialization(conf, false);//禁止使用java语言本身的序列化 StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } }
第11行,咱们注册person类使用Kryo序列化,person对象除了有基本类型int字段外,还有arraylist,linkedlist类型以及自定义的student类型。arraylist类型是storm默认已经提供了支持。
这里若是咱们不对linkedlist类型和自定义类型student进行注册则该拓扑在运行时则会报没法序列化student类型异常。
这个时候有两种办法解决:
一种就是使student实现java的public class Studnet implements Serializable接口,则该拓扑会成功运行。由于storm若是发现传输的对象若是没有注册为Kryo,则就会使用java的序列化对象,而linkedlist默认已经实现了该接口,因此才会出现前面报student对象没法序列化,而后使得student实现java的序列化接口便可。
第二种方案就是,咱们对student类进行注册conf.registerSerialization(Studnet.class);。
虽然linkedlist不注册,会默认使用java的序列化,可是出于效率的考虑,咱们将其注册为Kryo。
提示:由于有些集合类型,storm没有提供序列化支持,可是实现了java序列化接口,因此若是咱们不加以控制,会使用java序列化而拖累整个系统。因此推荐使用
Config.setFallBackOnJavaSerialization(conf, false);禁止使用java语言本身的序列化来能够在本地模式时及时发现报错信息,将问题尽早解决。
咱们使用kryo序列化,可是有时候咱们并不但愿传输对象的全部字段,而只是传输对象的某些字段,从而进一步提升消息的传递速率,这个时候咱们可使用kryo的自定义序列化机制来指定传输的值。
package cn.intsmaze.serializable.bean; import java.util.ArrayList; import java.util.LinkedList; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; public class PersonSerializable extends Serializer<Person>{ @Override public Person read(Kryo kryo, Input input, Class<Person> arg2) { System.out.println("序列化"); Person person=new Person(); person.setAge(input.readInt()); person.setArrayList(kryo.readObject(input, ArrayList.class)); person.setLinkedList(kryo.readObject(input, LinkedList.class));//该类型,storm默认不支持,因此要在topology中注册该类型,若是不注册,则会使用java序列化。 person.setStudnet(kryo.readObject(input, Studnet.class));//该类型,storm默认不支持,因此要在topology中注册该类型,若是不注册,且java序列化没有实现,则会报错。 return person; } @Override public void write(Kryo kryo, Output output, Person person) { System.out.println("反序列化"); output.writeInt(person.getAge()); kryo.writeObject(output, person.getArrayList()); kryo.writeObject(output, person.getLinkedList()); kryo.writeObject(output, person.getStudnet()); } }
package cn.intsmaze.serializable; import java.util.LinkedList; import cn.intsmaze.serializable.bean.Person; import cn.intsmaze.serializable.bean.PersonSerializable; import cn.intsmaze.serializable.bean.Studnet; public class TopologyBean { public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new SpoutBean(), 1); builder.setBolt("bolt", new BoltBean(), 1).shuffleGrouping("spout"); Config conf = new Config(); conf.registerSerialization(Studnet.class); conf.registerSerialization(LinkedList.class); conf.registerSerialization(Person.class, PersonSerializable.class); conf.setNumWorkers(2); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); } }
由于PersonSerializable类中指定了要传输person对象的int,studne,ArrayList,LinkedList 类型。
若是咱们注释掉第12行 conf.registerSerialization(Studnet.class);且Studnet类没有实现java的序列化,则拓扑的任务间传递消息进行序列化时就会报没法序列化该类的错误,感兴趣的同窗能够试试注释掉该行,看看storm会报什么异常。
第13行,咱们必须注册对LinkedList序列化,storm默认支持了对ArrayList类的序列化,但没有提供对LinkedList序列化,须要咱们手动注册,若是不注册,由于LinkedList实现了java的序列化接口,因此会使用java序列化,则不会报错。
强烈建议,在开发中就算注册了kyro序列化方式,也要设置该conf.setFallBackOnJavaSerialization(false)方法来禁止使用java序列化方式,由于实际开发中,核心架构搭建好了,会让团队成员直接在现成架构上编写,他们不须要了解storm的一些机制,可是这也带来问题,一种场景就是,开发人员对传输对象增长了一个LinkedList字段,可是他没有注册序列化类,storm就会对LinkedList使用java序列化,就会拖累系统的性能,因此在架构的时候,经过设置禁止java序列化方法,就能够在测试中及时发现问题所在。
补充:上面的全部一切,在本地运行以及部署到集群时,work数量设置为1时,都不会生效的。由于同一个对象公有一个内存,不会涉及网络传输的,也就不须要序列化和反序列化。
本人intsmaze生产上碰见的问题:storm工程中对传输对象使用了conf.registerSerialization(Person.class, PersonSerializable.class);方式来指定序列化该对象的某些字段。初级程序员在storm工程上开发时,由于业务须要对传输对象增长了一个字段,可是没有在PersonSerializable中序列化和反序列化该对象。恰巧的时,初级工程师本地模式和准生产测试时,topology的work的数量都为1,致使对象在bolt和bolt节点传输时并无走序列化方式,结果测试一切正常,可是上生产后,由于work数量是10个,立马在后一个bolt中报大量的空指针异常,形成很严重的生产问题。