spark连数据库

DataFrame提供了一条联结全部主流数据源并自动转化为可并行处理格式的渠道,经过它Spark能取悦大数据生态链上的全部玩家,不管是善用R的数据科学家,惯用SQL的商业分析师,仍是在乎效率和实时性的统计工程师。php

  以一个常见的场景 -- 日志解析为例,有时咱们须要用到一些额外的结构化数据(好比作IP和地址的映射),一般这样的数据会存在MySQL,而访问的方式有两种:一是每一个worker远程去检索数据库,弊端是耗费额外的网络I/O资源;二是使用JdbcRDD的API转化为RDD格式,而后编写繁复的函数去实现检索,显然要写更多的代码。而如今Spark一行代码就能实现从MySQL到DataFrame的转化,而且支持SQL查询。java

在上一篇已经对文本格式进行测试,如今对hive hbase mysql oracle 以及临时表之间join查询作测试mysql

 

 1.访问mysqlsql

 除了JSON以外,DataFrame如今已经能支持MySQL、Hive、HDFS、PostgreSQL等外部数据源,而对关系数据库的读取,是经过jdbc实现的。shell

1
2
3
4
5
6
7
8
9
10
11
12
bin/spark-shell --driver- class -path ./lib/mysql-connector-java- 5.1 . 24 -bin.jar
  val sc = new org.apache.spark.SparkContext
  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
  val jdbcDF = sqlContext.load( "jdbc" , Map( "url" -> "jdbc:mysql://192.168.0.110:3306/hidata?user=root&password=123456" , "dbtable" -> "loadinfo" ))
 
 
  bin/spark-sql --driver- class -path ./lib/mysql-connector-java- 5.1 . 24 -bin.jar
spark-sql> create temporary table jdbcmysql using  org.apache.spark.sql.jdbc options(url "jdbc:mysql://192.168.0.110:3306/hidata?user=root&password=123456" ,dbtable "loadinfo" )
spark-sql>select * from jdbcmysql;
//注意src是hive原本就存在的表,在spark sql中不用创建临时表,直接能够进行操做
//实现hive和mysql中表的联合查询
select * from src join jdbcmysql on (src.key = jdbcmysql.id);

2.访问Oracle数据库

同理,但注意链接的URL不同,也是试了很久apache

1
2
bin/spark-shell --driver- class -path ./lib/ojdbc 6 .jar
val jdbcDF = sqlContext.load( "jdbc" , Map( "url" -> "jdbc:oracle:thin:kang/123456@192.168.0.110:1521:orcl" , "dbtable" -> "TEST" ))

Spark十八般武艺又能够派上用场了。json

错误的URL:网络

1
2
val jdbcDF = sqlContext.load( "jdbc" , Map( "url" -> "jdbc:oracle:thin:@192.168.0.110:1521:orcl&user=kang&password=123456" , "dbtable" -> "TEST" ))
val jdbcDF = sqlContext.load( "jdbc" , Map( "url" -> "jdbc:oracle:thin:@192.168.0.110:1521/orcl&user=kang&password=123456" , "dbtable" -> "TEST" ))

报错类型:看起来最像的解决办法,留着之后用oracle

java.sql.SQLException: Io : NL Exception was generated错误解决(jdbc数据源问题)

解决Oracle ORA-12505, TNS:listener does not currently know of SID given in connect  

第一种方式,会告知没法识别SID,其实在链接时将orcl&user=kang&password=123456都当作其SID,其实就接近了。通常平时用jdbc链接数据库,url user password都分开,学习一下这种方式^^

Oracle的JDBC url三种方式:

1
2
3
4
5
6
1 .普通SID方式
jdbc : oracle : thin : username/password @ x.x.x. 1 : 1521 : SID
2 .普通ServerName方式
jdbc : oracle : thin : username/password @ //x.x.x.1:1522/ABCD
3 .RAC方式
jdbc : oracle : thin :@ (DESCRIPTION = (ADDRESS _ LIST = (ADDRESS = (PROTOCOL = TCP)(HOST = x.x.x. 1 )(PORT = 1521 ))(ADDRESS = (PROTOCOL = TCP)(HOST = x.x.x. 2 )(PORT = 1521 )))(LOAD _ BALANCE = yes)(CONNECT _ DATA = (SERVER = DEDICATED)(SERVICE _ NAME = xxrac)))

具体参看这里

 

3.访问hive  

hive和spark sql的关系,参见

其实spark sql从一开始就支持hive。Spark提供了一个HiveContext的上下文,实际上是SQLContext的一个子类,但从做用上来讲,sqlContext也支持Hive数据源。只要在部署Spark的时候加入Hive选项,并把已有的hive-site.xml文件挪到$SPARK_HOME/conf路径下,咱们就能够直接用Spark查询包含已有元数据的Hive表了。

1.Spark-sql方式

spark-sql是Spark bin目录下的一个可执行脚本,它的目的是经过这个脚本执行Hive的命令,即原来经过

hive>输入的指令能够经过spark-sql>输入的指令来完成。

spark-sql可使用内置的Hive metadata-store,也可使用已经独立安装的Hive的metadata store

配置步骤:

1. 将Hive的conf目录的hive-site.xml拷贝到Spark的conf目录

2. 将hive-site.xml中关于时间的配置的时间单位,好比ms,s所有删除掉

错误信息:Exception in thread "main" java.lang.RuntimeException: java.lang.NumberFormatException: For input string: "5s" 一直觉得是输入格式的问题。。

3. 将mysql jdbc的驱动添加到Spark的Classpath上

1
export SPARK _ CLASSPATH = $SPARK _ CLASSPATH : /home/hadoop/software/spark- 1.2 . 0 -bin-hadoop 2.4 /lib/mysql-connector-java- 5.1 . 34 .jar
1
2
3
[hadoop @ hadoop bin]$ ./spark-sql 
Spark assembly has been built with Hive, including Datanucleus jars on classpath 
SET spark.sql.hive.version = 0.13 . 1

提示编译的时候要带2个参数

从新编译:./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -DskipTests -Dhadoop.version=2.4.1 -Phive -Phive-thriftserver

在Spark-default中已经指定

 

建立表

1
2
3
spark-sql> create table word 6 (id int,word string) row format delimited fields terminated by ',' stored as textfile ;  
OK 
Time taken : 10.852 seconds 

 导入数据

1
2
3
4
5
6
7
spark-sql> load data local inpath '/home/hadoop/word.txt' into table word 6
Copying data from file : /home/hadoop/word.txt 
Copying file : file : /home/hadoop/word.txt 
Loading data to table default.word 6 
Table default.word 6 stats : [numFiles = 1 , numRows = 0 , totalSize = 31 , rawDataSize = 0
OK 
Time taken : 2.307 seconds

 与其余数据源联合查询

1
select * from src join jdbcmysql on (src.key = jdbcmysql.id);

2.Spark-shell方式 

1
sqlContext.sql( "select count(*) from hive_people" ).show()

  

4.将dataframe数据写入Hive分区表

DataFrame将数据写入hive中时,默认的是hive默认数据库,insertInto没有指定数据库的参数,使用下面方式将数据写入hive表或者hive表的分区中。

一、将DataFrame数据写入到Hive表中

从DataFrame类中能够看到与hive表有关的写入Api有如下几个:

1
2
3
4
registerTempTable(tableName : String) : Unit,
insertInto(tableName : String) : Unit
insertInto(tableName : String, overwrite : Boolean) : Unit
saveAsTable(tableName : String, source : String, mode : [size = 13.3333320617676 px]SaveMode, options : Map[String, String]) : Unit

还有不少重载函数,不一一列举

registerTempTable函数是建立spark临时表

insertInto函数是向表中写入数据,能够看出此函数不能指定数据库和分区等信息,不能够直接进行写入。

向hive数据仓库写入数据必须指定数据库,hive数据表创建能够在hive上创建,或者使用hiveContext.sql(“create table ....")

 

下面语句是向指定数据库数据表中写入数据:

1
2
3
4
5
6
7
case class Person(name : String,col 1 : Int,col 2 : String) 
val sc = new org.apache.spark.SparkContext    
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 
import hiveContext.implicits. _ 
hiveContext.sql( "use DataBaseName"
val data = sc.textFile( "path" ).map(x = >x.split( "\\s+" )).map(x = >Person(x( 0 ),x( 1 ).toInt,x( 2 ))) < br > data.toDF()
insertInto( "tableName" )

建立一个case类将RDD中数据类型转为case类型,而后经过toDF转换为DataFrame,调用insertInto函数时,首先指定数据库,使用的是hiveContext.sql("use DataBaseName")语句,就能够将DataFrame数据写入hive数据表中了

 

二、将DataFrame数据写入hive指定数据表的分区中

hive数据表创建能够在hive上创建,或者使用hiveContext.sql(“create table ...."),使用saveAsTable时数据存储格式有限,默认格式为parquet,能够指定为json,若是有其余格式指定,尽可能使用语句来创建hive表。

将数据写入分区表的思路是:首先将DataFrame数据写入临时表,以后是由hiveContext.sql语句将数据写入hive分区表中。具体操做以下:

1
2
3
4
5
6
7
8
case class Person(name : String,col 1 : Int,col 2 : String) 
val sc = new org.apache.spark.SparkContext    
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc) 
import hiveContext.implicits. _ 
hiveContext.sql( "use DataBaseName"
val data = sc.textFile( "path" ).map(x = >x.split( "\\s+" )).map(x = >Person(x( 0 ),x( 1 ).toInt,x( 2 ))) 
data.toDF().registerTempTable( "table1"
hiveContext.sql( "insert into table2 partition(date='2015-04-02') select name,col1,col2 from table1" )

使用以上方式就能够将dataframe数据写入hive分区表了。

相关文章
相关标签/搜索