测试代码以下:javascript
package cn.xpleaf.bigdata.spark.scala.sql.p1 import java.util.Properties import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{SQLContext, SaveMode} /** * SparkSQL关于加载数据和数据落地的各类实战操做 */ object _03SparkSQLLoadAndSaveOps { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf().setMaster("local[2]").setAppName(_01SparkSQLOps.getClass.getSimpleName) val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) // readOps(sqlContext) writeOps(sqlContext) sc.stop() } /** * 在write结果到目录中的时候须要留意相关异常 * org.apache.spark.sql.AnalysisException: path file:/D:/data/spark/sql/people-1.json already exists * 若是还想使用该目录的话,就须要设置具体的保存模式SaveMode * ErrorIfExist * 默认的,目录存在,抛异常 * Append * 追加 * Ingore * 忽略,至关于不执行 * Overwrite * 覆盖 */ def writeOps(sqlContext:SQLContext): Unit = { val df = sqlContext.read.json("D:/data/spark/sql/people.json") df.registerTempTable("people") val retDF = sqlContext.sql("select * from people where age > 20") // retDF.show() // 将结果落地 //retDF.coalesce(1).write.mode(SaveMode.Overwrite).json("D:/data/spark/sql/people-1.json") // 落地到数据库 val url = "jdbc:mysql://localhost:3306/test" val table = "people1" // 会从新建立一张新表 val properties = new Properties() properties.put("user", "root") properties.put("password", "root") retDF.coalesce(1).write.jdbc(url, table, properties) } /* // sparkSQL读数据 // java.lang.RuntimeException: file:/D:/data/spark/sql/people.json is not a Parquet file sparkSQL使用read.load加载的默认文件格式为parquet(parquet.apache.org) 加载其它文件格式怎么办? 须要指定加载文件的格式.format("json") */ def readOps(sqlContext:SQLContext): Unit = { // val df = sqlContext.read.load("D:/data/spark/sql/users.parquet") // val df = sqlContext.read.format("json").load("D:/data/spark/sql/people.json") // val df = sqlContext.read.json("D:/data/spark/sql/people.json") val url = "jdbc:mysql://localhost:3306/test" val table = "people" val properties = new Properties() properties.put("user", "root") properties.put("password", "root") val df = sqlContext.read.jdbc(url, table, properties) df.show() } }
当执行读操做时,输出结果以下:html
+---+----+---+------+
| id|name|age|height| +---+----+---+------+ | 1| 小甜甜| 18| 168.0| | 2| 小丹丹| 19| 167.0| | 3| 大神| 25| 181.0| | 4| 团长| 38| 158.0| | 5| 记者| 22| 169.0| +---+----+---+------+
当执行写操做时:java
1.若是保存到json文件 注意有各类写模式,另外其保存的是一个目录,与HDFS兼容的目录格式 2.若是保存到jdbc 则会在数据库中建立一个DataFrame所包含列的表,注意该表不能存在
须要先启动Hive,而后再进行下面的操做。node
测试代码以下:mysql
package cn.xpleaf.bigdata.spark.scala.sql.p2
import cn.xpleaf.bigdata.spark.scala.sql.p1._01SparkSQLOps import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.hive.HiveContext /** * 经过建立HiveContext来操做Hive中表的数据 * 数据源: * teacher_info.txt * name(String) height(double) * zhangsan,175 * lisi,180 * wangwu,175 * zhaoliu,195 * zhouqi,165 * weiba,185 * * create table teacher_info( * name string, * height double * ) row format delimited * fields terminated by ','; * * teacher_basic.txt * name(String) age(int) married(boolean) children(int) * zhangsan,23,false,0 * lisi,24,false,0 * wangwu,25,false,0 * zhaoliu,26,true,1 * zhouqi,27,true,2 * weiba,28,true,3 * * create table teacher_basic( * name string, * age int, * married boolean, * children int * ) row format delimited * fields terminated by ','; * * * 需求: *1.经过sparkSQL在hive中建立对应表,将数据加载到对应表 *2.执行sparkSQL做业,计算teacher_info和teacher_basic的关联信息,将结果存放在一张表teacher中 * * 在集群中执行hive操做的时候,须要如下配置: * 一、将hive-site.xml拷贝到spark/conf目录下,将mysql connector拷贝到spark/lib目录下 二、在$SPARK_HOME/conf/spark-env.sh中添加一条记录 export SPARK_CLASSPATH=$SPARK_CLASSPATH:$SPARK_HOME/lib/mysql-connector-java-5.1.39.jar */ object _01HiveContextOps { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() // .setMaster("local[2]") .setAppName(_01SparkSQLOps.getClass.getSimpleName) val sc = new SparkContext(conf) val hiveContext = new HiveContext(sc) //建立teacher_info表 hiveContext.sql("CREATE TABLE teacher_info(" + "name string, " + "height double) " + "ROW FORMAT DELIMITED " + "FIELDS TERMINATED BY ','") hiveContext.sql("CREATE TABLE teacher_basic(" + "name string, " + "age int, " + " married boolean, " + "children int) " + "ROW FORMAT DELIMITED " + "FIELDS TERMINATED BY ','") // 向表中加载数据 hiveContext.sql("LOAD DATA LOCAL INPATH '/home/uplooking/data/hive/sql/teacher_info.txt' INTO TABLE teacher_info") hiveContext.sql("LOAD DATA LOCAL INPATH '/home/uplooking/data/hive/sql/teacher_basic.txt' INTO TABLE teacher_basic") //第二步操做 计算两张表的关联数据 val joinDF = hiveContext.sql("SELECT " + "b.name, " + "b.age, " + "if(b.married, '已婚', '未婚') as married, " + "b.children, " + "i.height " + "FROM teacher_info i " + "INNER JOIN teacher_basic b ON i.name = b.name") joinDF.collect().foreach(println) joinDF.write.saveAsTable("teacher") sc.stop() } }
打包后上传到集群环境中,而后针对Spark作以下配置:sql
在集群中执行hive操做的时候,须要如下配置:
一、将hive-site.xml拷贝到spark/conf目录下,将mysql connector拷贝到spark/lib目录下
二、在$SPARK_HOME/conf/spark-env.sh中添加一条记录 export SPARK_CLASSPATH=$SPARK_CLASSPATH:$SPARK_HOME/lib/mysql-connector-java-5.1.39.jar
使用的spark提交做业的脚本以下:shell
[uplooking@uplooking01 spark]$ cat spark-submit-standalone.sh
#export HADOOP_CONF_DIR=/home/uplooking/app/hadoop/etc/hadoop /home/uplooking/app/spark/bin/spark-submit \ --class $2 \ --master spark://uplooking02:7077 \ --executor-memory 1G \ --num-executors 1 \ $1 \
执行以下命令:数据库
./spark-submit-standalone.sh spark-hive.jar cn.xpleaf.bigdata.spark.scala.sql.p2._01HiveContextOps
能够在做业执行的输出结果有看到咱们指望的输出,也能够直接在Hive中操做来进行验证:apache
hive> show tables; OK hpeople people t1 teacher teacher_basic teacher_info Time taken: 0.03 seconds, Fetched: 6 row(s) hive> select * from teacher; OK zhangsan 23 未婚 0 175.0 lisi 24 未婚 0 180.0 wangwu 25 未婚 0 175.0 zhaoliu 26 已婚 1 195.0 zhouqi 27 已婚 2 165.0 weiba 28 已婚 3 185.0 Time taken: 0.369 seconds, Fetched: 6 row(s)
须要确保ElasticSearch环境已经搭建好。json
测试代码以下:
package cn.xpleaf.bigdata.spark.scala.sql.p2
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.spark.sql._
import org.elasticsearch.spark._
/** * Spark和ES的集成操做 * 引入Spark和es的maven依赖 * elasticsearch-hadoop * 2.3.0 * 将account.json加载到es的索引库spark/account * 能够参考官方文档:https://www.elastic.co/guide/en/elasticsearch/hadoop/2.3/spark.html */ object _02SparkElasticSearchOps { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_02SparkElasticSearchOps.getClass().getSimpleName) .setMaster("local[2]") /** * Spark和es的集成配置 */ conf.set("es.index.auto.create", "true") conf.set("es.nodes", "uplooking01") conf.set("es.port", "9200") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) // write2ES(sqlContext) readFromES(sc) sc.stop() } /** * 从es中读数据 * (使用sparkContext进行操做) */ def readFromES(sc:SparkContext): Unit = { val resources = "spark/account" // 索引库/类型 val jsonRDD = sc.esJsonRDD(resources) jsonRDD.foreach(println) } /** * 向es中写入数据 * (使用sqlContext进行操做) */ def write2ES(sqlContext:SQLContext): Unit = { val jsonDF = sqlContext.read.json("D:/data/spark/sql/account.json") val resources = "spark/account" // 索引库/类型 jsonDF.saveToEs(resources) } }
使用Spark SQL中的内置函数对数据进行分析,Spark SQL API不一样的是,DataFrame中的内置函数操做的结果是返回一个Column对象,而DataFrame天生就是"A distributed collection of data organized into named columns.",这就为数据的复杂分析创建了坚实的基础并提供了极大的方便性,例如说,咱们在操做DataFrame的方法中能够随时调用内置函数进行业务须要的处理,这之于咱们构建附件的业务逻辑而言是能够极大的减小没必要须的时间消耗(基于上就是实际模型的映射),让咱们聚焦在数据分析上,这对于提升工程师的生产力而言是很是有价值的Spark 1.5.x开始提供了大量的内置函数,还有max、mean、min、sum、avg、explode、size、sort_array、day、to_date、abs、acos、asin、atan
整体上而言内置函数包含了五大基本类型:
一、聚合函数,例如countDistinct、sumDistinct等; 二、集合函数,例如sort_array、explode等 三、日期、时间函数,例如hour、quarter、next_day 四、数学函数,例如asin、atan、sqrt、tan、round等; 五、开窗函数,例如rowNumber等 六、字符串函数,concat、format_number、rexexp_extract 七、其它函数,isNaN、sha、randn、callUDF 如下为Hive中的知识内容,可是显然Spark SQL也有一样的概念 UDF 用户自定义函数:User Definded Function 一路输入,一路输出 a--->A strlen("adbad")=5 UDAF 用户自定义聚合函数:User Definded Aggregation Function 多路输入,一路输出 sum(a, b, c, d)---->汇总的结果 表函数 UDTF:用户自定义表函数:User Definded Table Function 多路输入,多路输出 "hello you" "hello me" ---->转换操做,----->split("")---->Array[] ["hello, "you"]---> "hello" "you" ---->行列转换
一个基本的案例以下:
package cn.xpleaf.bigdata.spark.scala.sql.p2
import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SQLContext /** * SparkSQL 内置函数操做 */ object _03SparkSQLFunctionOps { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_03SparkSQLFunctionOps.getClass().getSimpleName) .setMaster("local[2]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val pdf = sqlContext.read.json("D:/data/spark/sql/people.json") pdf.show() pdf.registerTempTable("people") // 统计人数 sqlContext.sql("select count(1) from people").show() // 统计最小年龄 sqlContext.sql("select age, " + "max(age) as max_age, " + "min(age) as min_age, " + "avg(age) as avg_age, " + "count(age) as count " + "from people group by age order by age desc").show() sc.stop() } }
输出结果以下:
+---+------+-------+
|age|height| name| +---+------+-------+ | 10| 168.8|Michael| | 30| 168.8| Andy| | 19| 169.8| Justin| | 32| 188.8| Jack| | 10| 158.8| John| | 19| 179.8| Domu| | 13| 179.8| 袁帅| | 30| 175.8| 殷杰| | 19| 179.9| 孙瑞| +---+------+-------+ 18/05/09 17:53:23 INFO FileInputFormat: Total input paths to process : 1 +---+ |_c0| +---+ | 9| +---+ 18/05/09 17:53:24 INFO FileInputFormat: Total input paths to process : 1 +---+-------+-------+-------+-----+ |age|max_age|min_age|avg_age|count| +---+-------+-------+-------+-----+ | 32| 32| 32| 32.0| 1| | 30| 30| 30| 30.0| 2| | 19| 19| 19| 19.0| 3| | 13| 13| 13| 13.0| 1| | 10| 10| 10| 10.0| 2| +---+-------+-------+-------+-----+
一、Spark 1.5.x版本之后,在Spark SQL和DataFrame中引入了开窗函数,好比最经典的就是咱们的row_number(),可让咱们实现分组取topn的逻辑。
二、作一个案例进行topn的取值(利用Spark的开窗函数),不知道同窗们是否还有印象,咱们以前在最先的时候,作过topn的计算,当时是很是麻烦的。可是如今用了Spark SQL以后,很是方便。
测试代码以下:
package cn.xpleaf.bigdata.spark.scala.sql.p2
import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.{SparkConf, SparkContext} /** * SparkSQL 内置函数操做 */ object _04SparkSQLFunctionOps { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_04SparkSQLFunctionOps.getClass().getSimpleName) .setMaster("local[2]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) /** * hive中的用户自定义函数UDF操做(即在SparkSQL中类比hive来进行操做,由于hive和SparkSQL都是交互式计算) * 1.建立一个普通的函数 * 2.注册(在SqlContext中注册) * 3.直接使用便可 * * 案例:建立一个获取字符串长度的udf */ // 1.建立一个普通的函数 def strLen(str:String):Int = str.length // 2.注册(在SqlContext中注册) sqlContext.udf.register[Int, String]("myStrLen", strLen) val list = List("Hello you", "Hello he", "Hello me") // 将RDD转换为DataFrame val rowRDD = sqlContext.sparkContext.parallelize(list).flatMap(_.split(" ")).map(word => { Row(word) }) val scheme = StructType(List( StructField("word", DataTypes.StringType, false) )) val df = sqlContext.createDataFrame(rowRDD, scheme) df.registerTempTable("test") // 3.直接使用便可 sqlContext.sql("select word, myStrLen(word) from test").show() sc.stop() } }
输出结果以下:
+-----+---+
| word|_c1| +-----+---+ |Hello| 5| | you| 3| |Hello| 5| | he| 2| |Hello| 5| | me| 2| +-----+---+
测试代码以下:
package cn.xpleaf.bigdata.spark.scala.sql.p2
import org.apache.log4j.{Level, Logger} import org.apache.spark.sql.types.{DataTypes, StructField, StructType} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{Row, SQLContext} /** * 这两部分都比较重要: * 1.使用SparkSQL完成单词统计操做 * 2.开窗函数使用 */ object _05SparkSQLFunctionOps2 { def main(args: Array[String]): Unit = { Logger.getLogger("org.apache.spark").setLevel(Level.OFF) val conf = new SparkConf() .setAppName(_05SparkSQLFunctionOps2.getClass().getSimpleName) .setMaster("local[2]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val list = List("Hello you", "Hello he", "Hello me") // 将RDD转换为DataFrame val rowRDD = sqlContext.sparkContext.parallelize(list).map(line => { Row(line) }) val scheme = StructType(List( StructField("line", DataTypes.StringType, false) )) val df = sqlContext.createDataFrame(rowRDD, scheme) df.registerTempTable("test") df.show() // 执行wordcount val sql = "select t.word, count(1) as count " + "from " + "(select " + "explode(split(line, ' ')) as word " + "from test) as t " + "group by t.word order by count desc" sqlContext.sql(sql).show() sc.stop() } }
输出结果以下:
+---------+
| line| +---------+ |Hello you| | Hello he| | Hello me| +---------+ +-----+-----+ | word|count| +-----+-----+ |Hello| 3| | me| 1| | he| 1| | you| 1| +-----+-----+
原文连接:http://blog.51cto.com/xpleaf/2114584