spark SQL (五)数据源 Data Source----json hive jdbc等数据的的读取与加载

1,JSON数据集java

 

       Spark SQL能够自动推断JSON数据集的模式,并将其做为一个Dataset[Row]这个转换能够SparkSession.read.json()在一个Dataset[String]或者一个JSON文件上完成。mysql

     请注意,做为json文件提供的文件不是典型的JSON文件。每行必须包含一个单独的,独立的有效JSON对象。有关更多信息,请参阅 JSON行文本格式,也称为换行符分隔的JSONsql

      对于常规的多行JSON文件,请将该multiLine选项设置true。例以下面的例子:shell

private def runJsonDatasetExample(spark: SparkSession): Unit = {

    import spark.implicits._

    //建立数据集时,经过导入这些
    //元素能够支持原始类型(Int,String等)和Product类型(case类)编码器。import  spark.implicits._

    // JSON数据集是经过路径指向的。
    // 路径能够是单个文本文件,也能够是存放文本文件的目录

    val path = "examples/src/main/resources/people.json"
    val peopleDF = spark.read.json(path)

    //推断的模式可使用printSchema()方法
    peopleDF.printSchema()
    // root
    //  |-- age: long (nullable = true)
    //  |-- name: string (nullable = true)

    //使用DataFrame
    peopleDF.createOrReplaceTempView("people")

    // SQL语句可使用spark
    val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
    teenagerNamesDF.show()
    // +------+
    // |  name|
    // +------+
    // |Justin|
    // +------+

    //或者,也能够为表示的JSON数据集建立一个DataFrame
    //数据集[String]每一个字符串
    val otherPeopleDataset = spark.createDataset(
      """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)
    val otherPeople = spark.read.json(otherPeopleDataset)
    otherPeople.show()
    // +---------------+----+
    // |        address|name|
    // +---------------+----+
    // |[Columbus,Ohio]| Yin|
    // +---------------+----+
  }

2,Hive 表数据库

       1) Spark SQL也支持读写存储在Apache Hive中的数据。可是,因为Hive具备大量依赖项,所以这些依赖项不包含在默认的Spark分发中。若是能够在类路径上找到Hive依赖关系,则Spark将自动加载它们。请注意,这些Hive依赖项也必须存在于全部工做节点上,由于它们须要访问Hive序列化和反序列化库(SerDes)才能访问存储在Hive中的数据。
       配置Hive是经过放置你的hive-site.xml,core-site.xml(用于安全配置)和hdfs-site.xml(用于HDFS配置)文件来完成的conf/。
       使用Hive时,必须SparkSession使用Hive支持进行实例化,包括链接到持续的Hive Metastore,支持Hive serdes和Hive用户定义的函数。没有现有Hive部署的用户仍然能够启用Hive支持。当未配置时hive-site.xml,上下文metastore_db在当前目录中自动建立,并建立一个目录spark.sql.warehouse.dir,该目录默认为 spark-warehouseSpark应用程序启动的当前目录中的目录。请注意,自Spark 2.0.0以来,该hive.metastore.warehouse.dir属性hive-site.xml已被弃用。而是使用spark.sql.warehouse.dir指定仓库中数据库的默认位置。您可能须要向启动Spark应用程序的用户授予写权限。
apache

import java.io.File

import org.apache.spark.sql.Row
import org.apache.spark.sql.SparkSession

object SparkHiveExample {

  case class Record(key: Int, value: String)

  def main(args: Array[String]) {
    // warehouseLocation指向托管数据库的默认位置,表
    val warehouseLocation = new File("spark-warehouse").getAbsolutePath

    val spark = SparkSession
      .builder()
      .appName("Spark Hive Example")
      .config("spark.sql.warehouse.dir", warehouseLocation)
      .enableHiveSupport()
      .getOrCreate()

    import spark.implicits._
    import spark.sql

    sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
    sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")

    //查询以HiveQL
    sql("SELECT * FROM src").show()
    // +---+-------+
    // |key|  value|
    // +---+-------+
    // |238|val_238|
    // | 86| val_86|
    // |311|val_311|
    // ...

    //聚合查询也被支持。
    sql("SELECT COUNT(*) FROM src").show()
    // +--------+
    // |count(1)|
    // +--------+
    // |    500 |
    // +--------+

    // SQL查询的结果自己就是DataFrame,而且支持全部正常的函数。
    val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 ORDER BY key")

    // DataFrames中的项目类型为Row,它容许您经过序号访问每一个列。
    val stringsDS = sqlDF.map {
      case Row(key: Int, value: String) => s"Key: $key, Value: $value"
    }
    stringsDS.show()
    // +--------------------+
    // |               value|
    // +--------------------+
    // |Key: 0, Value: val_0|
    // |Key: 0, Value: val_0|
    // |Key: 0, Value: val_0|
    // ...

    //您也可使用DataFrame在SparkSession中建立临时视图。
    val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))
    recordsDF.createOrReplaceTempView("records")

    //查询而后能够将DataFrame数据与存储在Hive中的数据结合起来。
    sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()
    // +---+------+---+------+
    // |key| value|key| value|
    // +---+------+---+------+
    // |  2| val_2|  2| val_2|
    // |  4| val_4|  4| val_4|
    // |  5| val_5|  5| val_5|
    // ...
    spark.stop()
  }
}

  2) 指定Hive表格的存储格式
      当你建立一个Hive表时,你须要定义这个表应该如何从/向文件系统读/写数据,即“输入格式”和“输出格式”。您还须要定义这个表应该如何将数据反序列化为行,或者将行序列化为数据,即“serde”。如下选项可用于指定存储格式(“serde”,“输入格式”,“输出格式”),例如CREATE TABLE src(id int) USING hive OPTIONS(fileFormat 'parquet')。默认状况下,咱们将以纯文本形式读取表格文件。请注意,Hive存储处理程序在建立表时不受支持,您可使用Hive端的存储处理程序建立一个表,而后使用Spark SQL来读取它。 json

属性名称 含义
fileFormat fileFormat是一种存储格式规范包,包括“serde”,“输入格式”和“输出格式”。目前咱们支持6个fileFormats:'sequencefile','rcfile','orc','parquet','textfile'和'avro'。
inputFormat, outputFormat 这两个选项将相应的`InputFormat`和`OutputFormat`类的名字指定为一个字符串,例如`org.apache.hadoop.hive.ql.io.orc.OrcInputFormat`。这两个选项必须成对出现,若是您
已经指定`fileFormat`选项,则不能指定它们。
serde 这个选项指定一个serde类的名字。当指定`fileFormat`选项时,若是给定的`fileFormat`
已经包含了serde的信息,就不要指定这个选项。目前“sequencefile”,“textfile”和“rcfile”不包
括serde信息,你可使用这个选项和这3个fileFormats。
fieldDelim, escapeDelim, collectionDelim, mapkeyDelim, lineDelim 这些选项只能与“textfile”fileFormat一块儿使用。他们定义了如何将分隔文件读入行。

定义的全部其余属性OPTIONS将被视为Hive serde属性。安全

  3) 与不一样版本的Hive Metastore交互服务器

        Spark SQL的Hive支持最重要的部分之一是与Hive Metastore进行交互,这使Spark SQL能够访问Hive表的元数据。从Spark 1.4.0开始,使用下面描述的配置,可使用Spark SQL的单个二进制版本查询不一样版本的Hive metastore。请注意,独立于用于与Metastore对话的Hive版本,内部Spark SQL将针对Hive 1.2.1进行编译,并将这些类用于内部执行(serdes,UDF,UDAF等)。
如下选项可用于配置用于检索元数据的Hive版本:并发

属性名称 默认 含义
spark.sql.hive.metastore.version       1.2.1 hive  Metastore版本。可用的选项是0.12.0经过1.2.1。
spark.sql.hive.metastore.jars        builtin 应该用来实例化HiveMetastoreClient的罐子的位置。该属性能够是如下三个选项之一:
1,builtin
-Phive启用 时,使用与Spark程序集捆绑在一块儿的Hive 1.2.1 。选择此选项时,spark.sql.hive.metastore.version
必须是1.2.1或者没有定义。
2,maven
使用从Maven存储库下载的指定版本的Hive jar。一般不建议将此配置用于生产部署。
3,JVM标准格式的类路径。这个类路径必须包含全部Hive及其依赖项,包括正确版本的Hadoop。
这些瓶子只须要在驱动程序中出现,可是若是您正在以纱线群集模式运行,则必须确保它们与您的应用程序一块儿打包。
spark.sql.hive.metastore.sharedPrefixes com.mysql.jdbc,
org.postgresql,
com.microsoft.sqlserver,
oracle.jdbc
应该使用在Spark SQL和特定版本的Hive之间共享的类加载程序加载的类前缀的逗号分隔列表。
应该共享的类的示例是须要与Metastore对话的JDBC驱动程序。其余须要共享的类是那些与已
经共享的类进行交互的类。例如,由log4j使用的自定义appender。
spark.sql.hive.metastore.barrierPrefixes (empty) 一个以逗号分隔的类前缀列表,应该针对Spark SQL正在与之进行通讯的每一个Hive版本显式
从新加载。例如,Hive UDF声明在一般会被共享(即org.apache.spark.*)的前缀中。

2,JDBC到其余数据库

       1) Spark SQL还包含一个可使用JDBC从其余数据库读取数据的数据源。这个功能应该比使用JdbcRDD更受欢迎。这是由于结果做为DataFrame返回,而且能够轻松地在Spark SQL中处理或者与其余数据源结合使用。JDBC数据源也更容易从Java或Python使用,由于它不须要用户提供ClassTag。(请注意,这与Spark SQL JDBC服务器不一样,后者容许其余应用程序使用Spark SQL运行查询)。
       要开始,您将须要在Spark类路径中为您的特定数据库包含JDBC驱动程序。例如,要从Spark Shell链接到postgres,您能够运行如下命令:

bin/spark-shell --driver-class-path postgresql-9.4.1207.jar --jars postgresql-9.4.1207.jar

       远程数据库中的表可使用Data Sources API做为DataFrame或Spark SQL临时视图加载。用户能够在数据源选项中指定JDBC链接属性。 user和password一般用于登陆到数据源提供为链接属性。除了链接属性以外,Spark还支持如下不区分大小写的选项:

属性名称 含义
url 要链接到的JDBC URL。源特定的链接属性能够在URL中指定。例如,jdbc:postgresql://localhost/test?user=fred&password=secret
dbtable 应该读取的JDBC表。请注意,FROM可使用在SQL查询的子句中有效的任何内容。例如,而不是一个完整的表,你也能够在括号中使用子查询。
driver 用于链接到此URL的JDBC驱动程序的类名。
partitionColumn, lowerBound, upperBound 若是指定了这些选项,则必须指定这些选项。另外, numPartitions必须指定。他们描述了如何从多个工做人员平行读取时对表格进行分区。 partitionColumn必须是相关表格中的数字列。请注意,lowerBound和upperBound只是用来决定分区步幅,而不是在表中过滤行。因此表中的全部行都将被分区并返回。这个选项只适用于阅读。
numPartitions 表格读取和写入中可用于并行的分区的最大数目。这也决定了并发JDBC链接的最大数量。若是要写入的分区数量超过此限制,则coalesce(numPartitions)在写入以前调用它,将其减小到此限制。
fetchsize JDBC提取大小,它决定每次往返取多少行。这能够帮助默认为低读取大小的JDBC驱动程序(例如,具备10行的Oracle)执行性能。这个选项只适用于阅读。
batchsize JDBC批量大小,用于肯定每次往返要插入多少行。这能够帮助JDBC驱动程序的性能。这个选项只适用于写做。它默认为1000。
isolationLevel 事务隔离级别,适用于当前链接。它能够是一个NONE,READ_COMMITTED,READ_UNCOMMITTED,REPEATABLE_READ,或SERIALIZABLE,对应于由JDBC的链接对象定义,缺省值为标准事务隔离级别READ_UNCOMMITTED。这个选项只适用于写做。请参阅中的文档java.sql.Connection。
truncate 这是一个JDBC编写器相关的选项。当SaveMode.Overwrite启用时,此选项会致使Spark截断现有的表,而不是删除并从新建立它。这能够更高效,并防止表元数据(例如,索引)被删除。可是,在某些状况下,例如新数据具备不一样的模式时,它将不起做用。它默认为false。这个选项只适用于写做。
createTableOptions 这是一个JDBC编写器相关的选项。若是指定,则此选项容许在建立表(例如CREATE TABLE t (name string) ENGINE=InnoDB.)时设置数据库特定的表和分区选项。这个选项只适用于写做。
createTableColumnTypes 建立表时使用的数据库列数据类型,而不是默认值。数据类型信息应该使用与CREATE TABLE列语法相同的格式来指定(例如:"name CHAR(64), comments VARCHAR(1024)")。指定的类型应该是有效的spark sql数据类型,该选项仅适用于写入。


  例子以下:

private def runJdbcDatasetExample(spark: SparkSession): Unit = {
    //注意:能够经过load / save或jdbc方法来实现JDBC加载和保存
    //从JDBC源加载数据
    val jdbcDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")
      .load()

    val connectionProperties = new Properties()
    connectionProperties.put("user", "username")
    connectionProperties.put("password", "password")
    val jdbcDF2 = spark.read
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    //将数据保存到JDBC源
    connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
    val jdbcDF3 = spark.read
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

    //在写入
    jdbcDF.write
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")
      .save()

    jdbcDF2.write
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    
    jdbcDF.write
      .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)

  }

 

  2) 可能遇到的异常和故障排除 

       JDBC驱动程序类必须对客户端会话和全部执行者上的原始类加载器可见。这是由于Java的DriverManager类执行了一个安全检查,致使它忽略了当打开一个链接时,原始类加载器不可见的全部驱动程序。一个方便的方法是修改全部工做节点上的compute_classpath.sh以包含驱动程序JAR。        某些数据库(如H2)将全部名称转换为大写。您须要使用大写字母来引用Spark SQL中的这些名称。

相关文章
相关标签/搜索