在Spark中使用Kryo序列化

spark序列化
 对于优化<网络性能>极为重要,将RDD以序列化格式来保存减小内存占用. spark.serializer=org.apache.spark.serializer.JavaSerialization
Spark默认 使用Java自带的ObjectOutputStream 框架来序列化对象,这样任何实现了 java.io.Serializable 接口的对象,都能被序列化。同时,还能够经过扩展 java.io.Externalizable 来控制序列化性能。Java序列化很灵活但性能差速度很慢,同时序列化后占用的字节数也较多。
spark.serializer=org.apache.spark.serializer.KryoSerialization
KryoSerialization速度快,能够配置为任何org.apache.spark.serializer的子类。但Kryo也不支持全部实现了 java.io.Serializable 接口的类型,它须要你在程序中 register 须要序列化的类型,以获得最佳性能。
LZO的支持要求先安装 Hadoop-lzo包(每一个节点), 并放到 Spark本地库中。若是是Debian包安装,在调用spark-submit时加上 --driver-library-path /usr/lib/hadoop/lib/native/ --driver-class-path /usr/lib/hadoop/lib/ 就能够。 下载lzo http://cn.jarfire.org/hadoop.lzo.html
在 SparkConf 初始化的时候调用 conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”) 使用 Kryo。这个设置不只控制各个worker节点之间的混洗数据序列化格式,同时还控制RDD存到磁盘上的序列化格式。须要在使用时注册须要序列化的类型,建议在对网络敏感的应用场景下使用Kryo。 若是你的自定义类型须要使用Kryo序列化,能够用 registerKryoClasses 方法先注册:
val conf = new SparkConf.setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)
最后,若是你不注册须要序列化的自定义类型,Kryo也能工做,不过每个对象实例的序列化结果都会包含一份完整的类名,这有点浪费空间。
在Scala中使用New API (Twitter Elephant Bird 包) lzo JsonInputFormat读取 LZO 算法压缩的 JSON 文件:
val input = sc.newAPIHadoopFile(inputFile, classOf[lzoJsonInputFormat], classOf[LongWritable], classOf[MapWritable], conf)
inputFile: 输入路径
接收第一个类:“格式”类,输入格式
接收第二个类:“键”
接收第二个类:“值”
conf:设置一些额外的压缩选项
在Scala中使用老API直接读取 KeyValueTextInputFormat()最简单的Hadoop输入格式 :
val input = sc.HadoopFile[Text, Text, KeyValueTextInputFormat](inputFile).map{ case (x, y) => (x.toString, y.toString) }
html

注:若是读取单个压缩过的输入,作好不要考虑使用Spark的封装(textFile/SequenceFile..),而是使用 newAPIHadoopFile 或者 HadoopFile,并指定正确的压缩解码器。 有些输入格式(如SequenceFile)容许咱们只压缩键值对数据中的值,这在查询时颇有用。其它一些输入格式也有本身的压缩控制,如:Twitter Elephant Bird 包中的许多格式均可以使用LZO算法压缩数据。java

相关文章
相关标签/搜索