今天主要介绍一下如何将 Spark dataframe 的数据转成 json 数据。用到的是 scala 提供的 json 处理的 api。sql
用过 Spark SQL 应该知道,Spark dataframe 自己有提供一个 api 能够供咱们将数据转成一个 JsonArray,咱们能够在 spark-shell 里头举个栗子来看一下。shell
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().master("master").appName("test").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate(); //提供隐式转换功能,好比将 Rdd 转为 dataframe import spark.implicits._ val df:DataFrame = sc.parallelize(Array(("abc",2),("efg",4))).toDF() df.show() /*-------------show ----------- +---+---+ | _1| _2| +---+---+ |abc| 2| |efg| 4| +---+---+ */ //这里使用 dataframe Api 转换成 jsonArray val jsonStr:String = a.toJSON.collectAsList.toString /*--------------- json String------------- [{"_1":"abc","_2":2}, {"_1":"efg","_2":4}] */
能够发现,咱们能够使用 dataframe 提供的 api 直接将 dataframe 转换成 jsonArray 的形式,但这样子却有些冗余。以上面的例子来讲,不少时候我要的不是这样的形式。数据库
[{"_1":"abc","_2":2}, {"_1":"efg","_2":4}]
而是下面这种形式。apache
[{"abc":2}, {"efg":4}]
这才是咱们一般会使用到的 json 格式。以 dataframe 的 api 转换而成的 json 明显太过冗余。为此,咱们须要借助一些 json 处理的包,本着能懒则懒的原则,直接使用 scala 提供的 json 处理包。编程
import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().master("master").appName("test").config("spark.sql.warehouse.dir", warehouseLocation).enableHiveSupport().getOrCreate(); //提供隐式转换功能,好比将 Rdd 转为 dataframe import spark.implicits._ val df:DataFrame = sc.parallelize(Array(("abc",2),("efg",4))).toDF() df.show() /*-------------show ----------- +---+---+ | _1| _2| +---+---+ |abc| 2| |efg| 4| +---+---+ */ //接下来不同了 val df2Array:Array[Tuple2[String,Int]] = df.collect().map{case org.apache.spark.sql.Row(x:String,y:Int) => (x,y)} val jsonData:Array[JSONObject] = aM.map{ i => new JSONObject(Map(i._1 -> i._2)) } val jsonArray:JSONArray = new JSONArray(jsonData.toList) /*-----------jsonArray------------ [{"abc" : 2}, {"efg" : 4}] */
大概说明一下上述的代码,首先咱们要先将 df 变量进行 collect 操做,将它转换成 Array ,可是要生成 jsonObject 得是 Array[Tuple2[T,T]] 的格式,因此咱们须要再进一步转换成对应格式。这里的 map 是函数式编程里面的 map 。json
而后也是用 map 操做生成 Array[JSONObject],最后再转换成 JSONArray 就能够。api
将数据转换成 json 的格式一般不能太大,通常用在 spark 跑出数据结果后写入到其余数据库的时候会用到,好比 Mysql 。app
以上~~函数式编程
欢迎关注公众号哈尔的数据城堡,里面有数据,代码,以及深度的思考。函数