IDEA建立SparkSQL程序

             IDEA建立SparkSQL程序sql

                                     做者:尹正杰apache

版权声明:原创做品,谢绝转载!不然将追究法律责任。json

 

 

 

一.建立DataFrameide

      <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
pom.xml文件内容(添加依赖关系)
package com.yinzhengjie.bigdata.spark.sql

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.SparkConf

object SparkSQLDemo {
  def main(args: Array[String]): Unit = {
    //建立spark配置信息
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo")

    //建立SparkSQL的环境对象,即建立SparkSession
    val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    //读取json文件,构建DataFrame对象
    val frame:DataFrame = spark.read.json("E:\\yinzhengjie\\bigdata\\input\\json\\user.json")

    //展现数据
    frame.show()

    //释放资源
    spark.close()
  }
}

 

二.采用SQL的语法访问数据ui

package com.yinzhengjie.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}

object SparkSQLDemo2 {
  def main(args: Array[String]): Unit = {
    //建立spark配置信息
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo2")

    //建立SparkSQL的环境对象,即建立SparkSession
    val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    //读取json文件,构建DataFrame对象
    val frame:DataFrame = spark.read.json("E:\\yinzhengjie\\bigdata\\input\\json\\user.json")

    //建立一张临时视图
    frame.createTempView("user")

    //展现数据
//    frame.show()
    spark.sql("select * from user").show()  //采用SQL的语法访问数据

    //释放资源
    spark.close()
  }
}

 

三.RDD,DataFrame和DataSet相互转换案例spa

package com.yinzhengjie.bigdata.spark.sql

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

/**
  *   定义样例类
  */
case class User(id:Int,name:String,age:Int)

object SparkSQLDemo3 {
  def main(args: Array[String]): Unit = {
    //建立spark配置信息
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQLDemo3")

    //建立SparkSQL的环境对象,即建立SparkSession
    val spark:SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    /**
      *   舒适提示:
      *     进行转换以前,须要引入隐式转换规则,这里的spark不是包名的含义,而是SparkSession对象的名字哟~
      */
      import spark.implicits._

    //建立RDD
    val listRDD:RDD[(Int,String,Int)] = spark.sparkContext.parallelize(List((1,"YinZhengjie",18),(2,"Jason Yin",28),(3,"Danny",27)))

    //转换为DataFrame
    val df:DataFrame = listRDD.toDF("Id","Name","Age")

    //将DataFrame转换为DataSet
    val ds:Dataset[User] = df.as[User]

    //将DataSet转换为DataFrame
    val df1:DataFrame = ds.toDF()

    //将DataFrame转换为RDD
    val rdd1:RDD[Row] = df1.rdd


    //遍历RDD,获取数据时,能够经过索引访问数据
    rdd1.foreach(row =>{
      println(row.getString(1))
    })

    //为listRDD手动添加类型
    val userRDD:RDD[User] = listRDD.map {
      case (id, name, age) => {
        User(id, name, age)
      }
    }
    //将RDD直接转换为DataSet
    val ds2:Dataset[User] = userRDD.toDS()

    //将DataSet直接转换为RDD
    val rdd2:RDD[User] = ds2.rdd

    //遍历rdd2
    rdd2.foreach(println)

    //释放资源
    spark.close()
  }
}
相关文章
相关标签/搜索