摘要java
咱们在使用spark的一个流程是:利用spark.sql()函数把数据读入到内存造成DataSet[Row](DataFrame)因为Row是新的spark数据集中没法实现自动的编码,须要对这个数据集进行编码,才能利用这些算子进行相关的操做,如何编码是一个问题,在这里就把这几个问题进行总结一下。报的错误:error: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.sql
报这个错误通常就是咱们在使用算子时其返回值的数据类型每每不是spark经过自身的反射能完成的自动编码部分,好比经过map算子,咱们在map算子的函数的返回值类型是Map类型的,就会出现上面的问题,由于Map集合类不在:基本的类型和String,case class和元组的范围以内,spark内部不能经过反射完成自动编码。apache
出现这个问题的缘由api
spark2.0之后的版本采用的是新的分布式数据集DataSet,其中DataFrame是DataSet[Row]的别名形式。而新的数据集采用了不少的优化,其中一个就是利用了Tungsten execution engine的计算引擎,这个计算引擎采用了不少的优化。其中一个就是本身维护了一个内存管理器,从而使计算从java jvm解脱出来了,使得内存的优化获得了很大的提高。同时新的计算引擎,把数据存储在内存中是以二进制的形式存储的,大部分全部的计算都是在二进制数据流上进行的,不须要把二进制数据流反序列化成java对象,而后再把计算的结果序列化成二进制数据流,而是直接在二进制流上进行操做,这样的状况就须要咱们存在一种机制就是java对象到二进制数据流的映射关系,否则咱们不知道二进制流对应的数据对象是几个字节,spark这个过程是经过Encoders来完成的,spark自身经过反射完成了一部分的自动编码过程:基本的类型和String,case class和元组,对于其余的集合类型或者咱们自定义的类,他是没法完成这样的编码的。须要咱们本身定义这样的编码也就是让其拥有一个schema。jvm
解决这个问题方式分布式
方法一:函数
这样就是把其转化为RDD,利用RDD进行操做,可是不建议用这个,相对于RDD,DataSet进行了不少的底层优化,拥有很不错性能性能
val orderInfo1 = spark.sql( """ |SELECT |o.id, |o.user_id |FROM default.api_order o |limit 100 """.stripMargin).rdd.map(myfunction)
方法二:优化
让其自动把DataSet[Row]转化为DataSet[P],若是Row里面有复杂的类型出现的话。编码
case class Orders(id: String, user_id: String) //这个case class要定义在咱们的单例对象的外面 object a { def main(args: Array[String]): Unit ={ import spark.implicits._ val orderInfo1 = spark.sql( """ |SELECT |o.id, |o.user_id |FROM default.api_order o |limit 100 """.stripMargin).as[Orders].map(myfunction) } }
方式三:
自定义一个schema,而后利用RowEncoder进行编码。这只是一个例子,里面的类型其实均可以经过spark的反射自动完成编码过程。
import spark.implicits._ val schema = StructType(StructType(Seq(StructField("id",StringType,true),StructField("user_id",StringType,true)))) val encoders = RowEncoder(schema) val orderInfo1 = spark.sql( """ |SELECT |o.id, |o.user_id |FROM default.api_order o |limit 100 """.stripMargin).map(row => row)(encoders)
方法四:
直接利用scala的模式匹配的策略case Row来进行是能够经过的,缘由是case Row()scala模式匹配的知识,这样能够知道集合Row里面拥有多少个基本的类型,则能够经过scala就能够完成对Row的自动编码,而后能够进行相应的处理。
import spark.implicits._ val orderInfo1 = spark.sql( """ |SELECT |o.id, |o.user_id |FROM default.api_order o |limit 100 """.stripMargin).map{case Row(id: String, user_id: String) => (id,user_id)} 这个获得的schema为: orderInfo1: org.apache.spark.sql.Dataset[(String, String)] = [_1: string, _2: string] 若是换成这样: val orderInfo1 = spark.sql( """ |SELECT |o.id, |o.user_id |FROM default.api_order o |limit 100 """.stripMargin).map{case Row(id: String, user_id: String) => List(id,user_id)} 获得的schema为: orderInfo1: org.apache.spark.sql.Dataset[List[String]] = [value: array<string>] 能够看出:spark是把元祖当作case class一种特殊形式拥有,schame的字段名称为_1,_2这样的特殊case clase