Spark PySpark数据类型的转换原理—Writable Converter

Spark目前支持三种开发语言:Scala、Java、Python,目前咱们大量使用Python来开发Spark App(Spark 1.2开始支持使用Python开发Spark Streaming App,咱们也准备尝试使用Python开发Spark Streaming App),在这期间关于数据类型的问题曾经困扰咱们很长时间,故在此记录一下心路历程。
 
Spark是使用Scala语言开发的,Hadoop是使用Java语言开发的,Spark兼容Hadoop Writable,而咱们使用Python语言开发Spark (Streaming) App,Spark Programming Guides(Spark 1.5.1)其中有一段文字说明了它们相互之间数据类型转换的关系:
 
 
也说是说,咱们须要处理两个方向的转换:
 
(1)Writable => Java Type => Python Type;
(2)Python Type => Java Type => Writable;
 
其中Java Type与Python Type之间数据类型的转换依赖开源组件Pyrolite,相应的数据类型转换以下:
 
(1)Python Type => Java Type;
 
 
(2)Java Type => Python Type;
 
 
也就是说,Pyrolite已经为Java Type与Python Type之间的数据类型转换创建了“标准”,咱们仅仅须要处理Writable与Java Type之间的数据转换就能够了。
 
从上图“Writable Support”中能够看出PySpark已经为咱们解决了经常使用的数据类型转换问题,但能够理解为“基本”数据类型,遇到复杂的状况,仍是须要咱们特殊处理,PySpark已经为咱们考虑到了这种业务场景,为咱们提供接口Converter(org.apache.spark.api.python.Converter),使得咱们能够根据本身的须要扩展数据类型转换机制:
 
 
接口Converter仅仅只有一个方法convert,其中T表示源数据类型,U表示目标数据类型,参数obj表示源数据值,返回值表示目标数据值。
 
Spark Programming Guides(Spark 1.5.1)也为咱们举例说明了一个须要自定义Converter的场景:
 
 
ArrayWritable是Hadoop Writable的一种,由于Array涉及到元素数据类型的问题,所以使用时须要实现相应的子类,如元素数据类型为整型:
 
 
从上面的描述可知,PySpark使用ArrayWritable时涉及到以下两个方向的数据类型转换:
 
(1)Tuple => Object[] => ArrayWritable;
(2)ArrayWritable => Object[] => Tuple;
 
咱们以IntArrayWritable为例说明如何自定义扩展Converter,同理也须要处理两个方向的数据类型转换:Tuple => Object[] => ArrayWritable、ArrayWritable => Object[] => Tuple。
 
(1)Tuple => Object[] => IntArrayWritable;
 
假设咱们有一个list,list的元素类型为tuple,而tuple的元素类型为int,咱们须要将这个list中的全部数据以SequenceFile的形式保存至HDFS。对于list中的每个元素tuple,Pyrolite能够帮助咱们完成Tuple => Object[]的转换,而Object[] => IntArrayWritable则须要咱们自定义Converter实现。
 
 
PySpark中使用这个Converter写入数据:
 
 
注意:SequenceFile的数据结构为<key, value>,为了简单起见,key指定为com.sina.dip.spark.converter.IntArrayWritable,value指定为org.apache.hadoop.io.NullWritable(即空值)。
 
运行上述程序时,由于有使用到咱们自定义的类,所以须要将com.sina.dip.spark.converter.IntArrayWritable、com.sina.dip.spark.converter.ObjectArrayToIntArrayWritableConverter编译打包为独立的Jar:converter.jar,并经过参数指定,以下:
 
/usr/lib/spark-1.5.1-bin-2.5.0-cdh5.3.2/bin/spark-submit --jars converter.jar 1.5.1/examples/app/spark_app_save_data_to_seqfile.py
 
(2)IntArrayWritable => Object[] => Tuple;
 
咱们须要将(1)中写入SequenceFile的Key(IntArrayWritable)还原为list,其中list的元素类型为tuple,tuple的元素类型为int,IntArrayWritable => Object[]也须要用到咱们自定义的Converter(Object[] => Tuple由Pyrolite负责):
 
 
PySpark使用这个Converter读取数据:
 
 
同(1),咱们须要将com.sina.dip.spark.converter.IntArrayWritable、com.sina.dip.spark.converter.IntArrayWritableToObjectArrayConverter编译打包为独立的Jar:converter.jar,并经过参数指定,以下:
 
/usr/lib/spark-1.5.1-bin-2.5.0-cdh5.3.2/bin/spark-submit --jars converter.jar 1.5.1/examples/app/spark_app_read_data_from_seqfile.py
 
输出结果:
 
 
能够看出,经过自定义扩展的Converter:com.sina.dip.spark.converter.ObjectArrayToIntArrayWritableConverter、com.sina.dip.spark.converter.IntArrayWritableToObjectArrayConverter,咱们实现了IntArrayWritable(com.sina.dip.spark.converter.IntArrayWritable)与Tuple(Python)之间的转换。
相关文章
相关标签/搜索