DataFrame提供了
一条联结全部主流数据源并自动转化为可并行处理格式的渠道,经过它Spark能取悦大数据生态链上的全部玩家,不管是善用R的数据科学家,惯用SQL的商业分析师,仍是在乎效率和实时性的统计工程师。php
以一个常见的场景 -- 日志解析为例,有时咱们须要用到一些额外的结构化数据(好比作IP和地址的映射),一般这样的数据会存在MySQL,而访问的方式有两种:一是每一个worker远程去检索数据库,弊端是耗费额外的网络I/O资源;二是使用JdbcRDD
的API转化为RDD格式,而后编写繁复的函数去实现检索,显然要写更多的代码。而如今Spark一行代码就能实现从MySQL到DataFrame
的转化,而且支持SQL查询。java
在上一篇已经对文本格式进行测试,如今对hive hbase mysql oracle 以及临时表之间join查询作测试mysql
1.访问mysqlsql
除了JSON以外,DataFrame
如今已经能支持MySQL、Hive、HDFS、PostgreSQL等外部数据源,而对关系数据库的读取,是经过jdbc
实现的。shell
1
2
3
4
5
6
7
8
9
10
11
12
|
bin/spark-shell --driver-
class
-path ./lib/mysql-connector-java-
5.1
.
24
-bin.jar
val
sc
=
new
org.apache.spark.SparkContext
val
sqlContext
=
new
org.apache.spark.sql.SQLContext(sc)
val
jdbcDF
=
sqlContext.load(
"jdbc"
, Map(
"url"
->
"jdbc:mysql://192.168.0.110:3306/hidata?user=root&password=123456"
,
"dbtable"
->
"loadinfo"
))
bin/spark-sql --driver-
class
-path ./lib/mysql-connector-java-
5.1
.
24
-bin.jar
spark-sql> create temporary table jdbcmysql using org.apache.spark.sql.jdbc options(url
"jdbc:mysql://192.168.0.110:3306/hidata?user=root&password=123456"
,dbtable
"loadinfo"
)
spark-sql>select * from jdbcmysql;
//注意src是hive原本就存在的表,在spark sql中不用创建临时表,直接能够进行操做
//实现hive和mysql中表的联合查询
select * from src join jdbcmysql on (src.key
=
jdbcmysql.id);
|
2.访问Oracle数据库
同理,但注意链接的URL不同,也是试了很久apache
1
2
|
bin/spark-shell --driver-
class
-path ./lib/ojdbc
6
.jar
val
jdbcDF
=
sqlContext.load(
"jdbc"
, Map(
"url"
->
"jdbc:oracle:thin:kang/123456@192.168.0.110:1521:orcl"
,
"dbtable"
->
"TEST"
))
|
Spark十八般武艺又能够派上用场了。json
错误的URL:网络
1
2
|
val
jdbcDF
=
sqlContext.load(
"jdbc"
, Map(
"url"
->
"jdbc:oracle:thin:@192.168.0.110:1521:orcl&user=kang&password=123456"
,
"dbtable"
->
"TEST"
))
val
jdbcDF
=
sqlContext.load(
"jdbc"
, Map(
"url"
->
"jdbc:oracle:thin:@192.168.0.110:1521/orcl&user=kang&password=123456"
,
"dbtable"
->
"TEST"
))
|
报错类型:看起来最像的解决办法,留着之后用oracle
java.sql.SQLException: Io : NL Exception was generated错误解决(jdbc数据源问题)
解决Oracle ORA-12505, TNS:listener does not currently know of SID given in connect
第一种方式,会告知没法识别SID,其实在链接时将orcl&user=kang&password=123456都当作其SID,其实就接近了。通常平时用jdbc链接数据库,url user password都分开,学习一下这种方式^^
Oracle的JDBC url三种方式:这
1
2
3
4
5
6
|
1
.普通SID方式
jdbc
:
oracle
:
thin
:
username/password
@
x.x.x.
1
:
1521
:
SID
2
.普通ServerName方式
jdbc
:
oracle
:
thin
:
username/password
@
//x.x.x.1:1522/ABCD
3
.RAC方式
jdbc
:
oracle
:
thin
:@
(DESCRIPTION
=
(ADDRESS
_
LIST
=
(ADDRESS
=
(PROTOCOL
=
TCP)(HOST
=
x.x.x.
1
)(PORT
=
1521
))(ADDRESS
=
(PROTOCOL
=
TCP)(HOST
=
x.x.x.
2
)(PORT
=
1521
)))(LOAD
_
BALANCE
=
yes)(CONNECT
_
DATA
=
(SERVER
=
DEDICATED)(SERVICE
_
NAME
=
xxrac)))
|
具体参看这里
3.访问hive
hive和spark sql的关系,参见
其实spark sql从一开始就支持hive。Spark提供了一个HiveContext
的上下文,实际上是SQLContext
的一个子类,但从做用上来讲,sqlContext
也支持Hive数据源。只要在部署Spark的时候加入Hive选项,并把已有的hive-site.xml
文件挪到$SPARK_HOME/conf
路径下,咱们就能够直接用Spark查询包含已有元数据的Hive表了。
1.Spark-sql方式
spark-sql是Spark bin目录下的一个可执行脚本,它的目的是经过这个脚本执行Hive的命令,即原来经过
hive>输入的指令能够经过spark-sql>输入的指令来完成。
spark-sql可使用内置的Hive metadata-store,也可使用已经独立安装的Hive的metadata store
配置步骤:
1. 将Hive的conf目录的hive-site.xml拷贝到Spark的conf目录
2. 将hive-site.xml中关于时间的配置的时间单位,好比ms,s所有删除掉
错误信息:Exception in thread "main" java.lang.RuntimeException: java.lang.NumberFormatException: For input string: "5s" 一直觉得是输入格式的问题。。
3. 将mysql jdbc的驱动添加到Spark的Classpath上
1
|
export SPARK
_
CLASSPATH
=
$SPARK
_
CLASSPATH
:
/home/hadoop/software/spark-
1.2
.
0
-bin-hadoop
2.4
/lib/mysql-connector-java-
5.1
.
34
.jar
|
1
2
3
|
[hadoop
@
hadoop bin]$ ./spark-sql
Spark assembly has been built
with
Hive, including Datanucleus jars on classpath
SET spark.sql.hive.version
=
0.13
.
1
|
提示编译的时候要带2个参数
从新编译:./make-distribution.sh --tgz -Phadoop-2.4 -Pyarn -DskipTests -Dhadoop.version=2.4.1 -Phive -Phive-thriftserver
在Spark-default中已经指定
建立表
1
2
3
|
spark-sql> create table word
6
(id int,word string) row format delimited fields terminated by
','
stored as textfile ;
OK
Time taken
:
10.852
seconds
|
导入数据
1
2
3
4
5
6
7
|
spark-sql> load data local inpath
'/home/hadoop/word.txt'
into table word
6
;
Copying data from file
:
/home/hadoop/word.txt
Copying file
:
file
:
/home/hadoop/word.txt
Loading data to table default.word
6
Table default.word
6
stats
:
[numFiles
=
1
, numRows
=
0
, totalSize
=
31
, rawDataSize
=
0
]
OK
Time taken
:
2.307
seconds
|
与其余数据源联合查询
1
|
select * from src join jdbcmysql on (src.key
=
jdbcmysql.id);
|
2.Spark-shell方式
1
|
sqlContext.sql(
"select count(*) from hive_people"
).show()
|
4.将dataframe数据写入Hive分区表
DataFrame将数据写入hive中时,默认的是hive默认数据库,insertInto没有指定数据库的参数,使用下面方式将数据写入hive表或者hive表的分区中。这
一、将DataFrame数据写入到Hive表中
从DataFrame类中能够看到与hive表有关的写入Api有如下几个:
1
2
3
4
|
registerTempTable(tableName
:
String)
:
Unit,
insertInto(tableName
:
String)
:
Unit
insertInto(tableName
:
String, overwrite
:
Boolean)
:
Unit
saveAsTable(tableName
:
String, source
:
String, mode
:
[size
=
13.3333320617676
px]SaveMode, options
:
Map[String, String])
:
Unit
|
还有不少重载函数,不一一列举
registerTempTable函数是建立spark临时表
insertInto函数是向表中写入数据,能够看出此函数不能指定数据库和分区等信息,不能够直接进行写入。
向hive数据仓库写入数据必须指定数据库,hive数据表创建能够在hive上创建,或者使用hiveContext.sql(“create table ....")
下面语句是向指定数据库数据表中写入数据:
1
2
3
4
5
6
7
|
case
class
Person(name
:
String,col
1
:
Int,col
2
:
String)
val
sc
=
new
org.apache.spark.SparkContext
val
hiveContext
=
new
org.apache.spark.sql.hive.HiveContext(sc)
import
hiveContext.implicits.
_
hiveContext.sql(
"use DataBaseName"
)
val
data
=
sc.textFile(
"path"
).map(x
=
>x.split(
"\\s+"
)).map(x
=
>Person(x(
0
),x(
1
).toInt,x(
2
)))
<
br
>
data.toDF()
insertInto(
"tableName"
)
|
建立一个case类将RDD中数据类型转为case类型,而后经过toDF转换为DataFrame,调用insertInto函数时,首先指定数据库,使用的是hiveContext.sql("use DataBaseName")语句,就能够将DataFrame数据写入hive数据表中了
二、将DataFrame数据写入hive指定数据表的分区中
hive数据表创建能够在hive上创建,或者使用hiveContext.sql(“create table ...."),使用saveAsTable时数据存储格式有限,默认格式为parquet,能够指定为json,若是有其余格式指定,尽可能使用语句来创建hive表。
将数据写入分区表的思路是:首先将DataFrame数据写入临时表,以后是由hiveContext.sql语句将数据写入hive分区表中。具体操做以下:
1
2
3
4
5
6
7
8
|
case
class
Person(name
:
String,col
1
:
Int,col
2
:
String)
val
sc
=
new
org.apache.spark.SparkContext
val
hiveContext
=
new
org.apache.spark.sql.hive.HiveContext(sc)
import
hiveContext.implicits.
_
hiveContext.sql(
"use DataBaseName"
)
val
data
=
sc.textFile(
"path"
).map(x
=
>x.split(
"\\s+"
)).map(x
=
>Person(x(
0
),x(
1
).toInt,x(
2
)))
data.toDF().registerTempTable(
"table1"
)
hiveContext.sql(
"insert into table2 partition(date='2015-04-02') select name,col1,col2 from table1"
)
|
使用以上方式就能够将dataframe数据写入hive分区表了。