spark SQL是一个spark结构数据处理模型。不像基本的rdd api,Spark 提供的接口能够给spark提供更多更多关于数据的结构和正在执行的计算的信息。另外,spark sql在性能优化上比以往的有作改善。目前有更多的方式和spark sql交互:sql,dataset api。不管你是用哪一种api/语言,计算时最终使用相同的sql引擎。html
Spark Sql的一个做用是执行sql查询。spark sql可以用来从现有的hive中读取数据。更多关于如何配置使用的问题,请参考Hibe 表章节。当使用其余编程语言运行sql,结果也将返回Dataset/DataFrame。你能经过使用命令行或者jdbc/odbc和sql接口交互。java
一个dataset是分布式数据集合。数据集合是一个spark1.6开始提供的接口,想要把rdd的优点(强相似,强大的lambda函数功能)和spark sql的优化执行的优点结合到一块儿来。dataset可以从jvm对象中建立而后使用transformations(map,flagmap,filter等等)进行转换。mysql
全部spark功能的入口点是SparkSession类,建立一个基本的sparksession,只要使用一句语句就话哦啊sql
SparkSession.builder()
:shell
import org.apache.spark.sql.SparkSession val spark = SparkSession .builder() .appName("Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate() // For implicit conversions like converting RDDs to DataFrames import spark.implicits._
更完整的代码能够在spark安装包数据库
examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scalaapache
路径下查看。编程
在spark2.0中SparkSession提供了内部对Hive功能的支持,包括查询HiveSql,访问Hive UDFs,以及从Hive表读数据的能力。要使用这些特征,你不须要单独安装一个hive。json
经过SparkSession,应用可以由现有的rdd,或者hive表,或者spark数据源建立DataFrames。举个栗子,经过json文件建立api
val df = spark.read.json("examples/src/main/resources/people.json") // Displays the content of the DataFrame to stdout df.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
非强类型/泛型 数据集操做(又叫作DataFrame操做)
DataFrames提供一个专用的语言来操做结构化数据。
如上所述,在Spark2.0中,DataFrames只是包含Rows的数据集。相对“强类型转换”这些被称做弱类型转换。【烂英文有时候翻译起来本身都不知道官方文档想表达啥。】
仍是下面的代码好理解
// This import is needed to use the $-notation import spark.implicits._ // Print the schema in a tree format df.printSchema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // Select only the "name" column df.select("name").show() // +-------+ // | name| // +-------+ // |Michael| // | Andy| // | Justin| // +-------+ // Select everybody, but increment the age by 1 df.select($"name", $"age" + 1).show() // +-------+---------+ // | name|(age + 1)| // +-------+---------+ // |Michael| null| // | Andy| 31| // | Justin| 20| // +-------+---------+ // Select people older than 21 df.filter($"age" > 21).show() // +---+----+ // |age|name| // +---+----+ // | 30|Andy| // +---+----+ // Count people by age df.groupBy("age").count().show() // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+
更全的操做请移步api文档
除了简单的列引用和表达式,datasets也有丰富的函数库包括字符串操做,时间计算,经常使用的数学操做等。具体请移步DataFrame Function Reference.
SparkSession sql功能使应用能够执行一串sql查询而且返回一个DataFrame。把sql引入了语言。
// Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") val sqlDF = spark.sql("SELECT * FROM people") sqlDF.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
临时视图的生命周期在Session范围内,随着session的中止而释放。若是你要让全部的session的能够访问,那么你就要搞个全局临时视图了。这个视图绑定在系统保留的global_temp数据库中,访问前咱们必须经过引用gloabal_temp来访问它。 e.g. SELECT * FROM global_temp.view1
.
// Register the DataFrame as a global temporary view df.createGlobalTempView("people") // Global temporary view is tied to a system preserved database `global_temp` spark.sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+ // Global temporary view is cross-session spark.newSession().sql("SELECT * FROM global_temp.people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
datasets和rdds相似,和java序列化相反的是,他们使用特殊的编码方式去序列化对象。
// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit, // you can use custom classes that implement the Product interface case class Person(name: String, age: Long) // Encoders are created for case classes val caseClassDS = Seq(Person("Andy", 32)).toDS() caseClassDS.show() // +----+---+ // |name|age| // +----+---+ // |Andy| 32| // +----+---+ // Encoders for most common types are automatically provided by importing spark.implicits._ val primitiveDS = Seq(1, 2, 3).toDS() primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4) // DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name val path = "examples/src/main/resources/people.json" val peopleDS = spark.read.json(path).as[Person] peopleDS.show() // +----+-------+ // | age| name| // +----+-------+ // |null|Michael| // | 30| Andy| // | 19| Justin| // +----+-------+
两种从rdd对象转换到dataset的方法。第一种是使用反射来反推一个rdd的对象类型。第二种是经过一个现有的rdd和你构建的schema,而且这种方式容许建立泛型,到运行时再来定义。
先建好schema对应的类型,在map过程加进去。show code
// For implicit conversions from RDDs to DataFrames import spark.implicits._ // Create an RDD of Person objects from a text file, convert it to a Dataframe val peopleDF = spark.sparkContext .textFile("examples/src/main/resources/people.txt") .map(_.split(",")) .map(attributes => Person(attributes(0), attributes(1).trim.toInt)) .toDF() // Register the DataFrame as a temporary view peopleDF.createOrReplaceTempView("people") // SQL statements can be run by using the sql methods provided by Spark val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19") // The columns of a row in the result can be accessed by field index teenagersDF.map(teenager => "Name: " + teenager(0)).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // or by field name teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show() // +------------+ // | value| // +------------+ // |Name: Justin| // +------------+ // No pre-defined encoders for Dataset[Map[K,V]], define explicitly implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]] // Primitive types and case classes can be also defined as // implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder() // row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T] teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect() // Array(Map("name" -> "Justin", "age" -> 19))
import org.apache.spark.sql.types._ // Create an RDD val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt") // The schema is encoded in a string val schemaString = "name age" // Generate the schema based on the string of schema val fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema = StructType(fields) // Convert records of the RDD (people) to Rows val rowRDD = peopleRDD .map(_.split(",")) .map(attributes => Row(attributes(0), attributes(1).trim)) // Apply the schema to the RDD val peopleDF = spark.createDataFrame(rowRDD, schema) // Creates a temporary view using the DataFrame peopleDF.createOrReplaceTempView("people") // SQL can be run over a temporary view created using DataFrames val results = spark.sql("SELECT name FROM people") // The results of SQL queries are DataFrames and support all the normal RDD operations // The columns of a row in the result can be accessed by field index or by field name results.map(attributes => "Name: " + attributes(0)).show() // +-------------+ // | value| // +-------------+ // |Name: Michael| // | Name: Andy| // | Name: Justin| // +-------------+
内建的dataframe函数提供经常使用的聚合函数,好比count,countDistinct,avg,max,min等等。
一个本地测试例子:
启动shell,加入mysql驱动包 spark-shell --master spark://hadoop-master:7077 --jars /root/mysql-connector-java-5.1.47.jar 从hdfs取文本文件 val userTxt = sc.textFile("hdfs://hadoop-master:9000//12306.txt"); import org.apache.spark.sql.Row 根据----分割数据 val userRdd = userTxt.map(_.split("----")).map(parts=>Row(parts(0),parts(1),parts(2),parts(3),parts(4),parts(5),parts(6))) 表元数据准备 val schemaString="email,acount,username,sfz,password,mobile,email1" import org.apache.spark.sql.types._ val fields = schemaString.split(",").map(fieldName => StructField(fieldName, StringType, nullable = true)) val schema =StructType(fields) rdd数据转df数据 val userDf = spark.createDataFrame(userRdd, schema) 数据写入mysql数据库 userDf.write.format("jdbc").option("url", "jdbc:mysql://192.168.199.204:3306/aaa").option("dbtable", "aaa").option("SaveMode","Overwrite").option("user", "root").option("password", "root").option("driver","com.mysql.jdbc.Driver").save()