IDEA建立SparkSQL程序sql
做者:尹正杰apache
版权声明:原创做品,谢绝转载!不然将追究法律责任。json
一.建立DataFrameide
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.1</version> </dependency>
package com.yinzhengjie.bigdata.spark.sql import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.SparkConf object SparkSQLDemo { def main(args: Array[String]): Unit = { //建立spark配置信息 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo") //建立SparkSQL的环境对象,即建立SparkSession val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() //读取json文件,构建DataFrame对象 val frame:DataFrame = spark.read.json("E:\\yinzhengjie\\bigdata\\input\\json\\user.json") //展现数据 frame.show() //释放资源 spark.close() } }
二.采用SQL的语法访问数据ui
package com.yinzhengjie.bigdata.spark.sql import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame, SparkSession} object SparkSQLDemo2 { def main(args: Array[String]): Unit = { //建立spark配置信息 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo2") //建立SparkSQL的环境对象,即建立SparkSession val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() //读取json文件,构建DataFrame对象 val frame:DataFrame = spark.read.json("E:\\yinzhengjie\\bigdata\\input\\json\\user.json") //建立一张临时视图 frame.createTempView("user") //展现数据 // frame.show() spark.sql("select * from user").show() //采用SQL的语法访问数据 //释放资源 spark.close() } }
三.RDD,DataFrame和DataSet相互转换案例spa
package com.yinzhengjie.bigdata.spark.sql import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} /** * 定义样例类 */ case class User(id:Int,name:String,age:Int) object SparkSQLDemo3 { def main(args: Array[String]): Unit = { //建立spark配置信息 val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo3") //建立SparkSQL的环境对象,即建立SparkSession val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() /** * 舒适提示: * 进行转换以前,须要引入隐式转换规则,这里的spark不是包名的含义,而是SparkSession对象的名字哟~ */ import spark.implicits._ //建立RDD val listRDD:RDD[(Int,String,Int)] = spark.sparkContext.parallelize(List((1,"YinZhengjie",18),(2,"Jason Yin",28),(3,"Danny",27))) //转换为DataFrame val df:DataFrame = listRDD.toDF("Id","Name","Age") //将DataFrame转换为DataSet val ds:Dataset[User] = df.as[User] //将DataSet转换为DataFrame val df1:DataFrame = ds.toDF() //将DataFrame转换为RDD val rdd1:RDD[Row] = df1.rdd //遍历RDD,获取数据时,能够经过索引访问数据 rdd1.foreach(row =>{ println(row.getString(1)) }) //为listRDD手动添加类型 val userRDD:RDD[User] = listRDD.map { case (id, name, age) => { User(id, name, age) } } //将RDD直接转换为DataSet val ds2:Dataset[User] = userRDD.toDS() //将DataSet直接转换为RDD val rdd2:RDD[User] = ds2.rdd //遍历rdd2 rdd2.foreach(println) //释放资源 spark.close() } }