Spark支持四种方式从数据库中读取数据

目前 Spark支持四种方式从数据库中读取数据,这里以Mysql为例进行介绍。

1、不指定查询条件

  这个方式连接MySql的函数原型是:mysql

def jdbc(url : String, table : String, properties : Properties) : DataFrame

  咱们只须要提供Driver的url,须要查询的表名,以及链接表相关属性properties。下面是具体例子:sql

 
val prop = new Properties()
val df = sqlContext.read.jdbc(url, "iteblog" , prop )
 
println(df.count())
println(df.rdd.partitions.size)

  咱们运行上面的程序,能够看到df.rdd.partitions.size输出结果是1,这个结果的含义是iteblog表的全部数据都是由RDD的一个分区处理的,因此说,若是你这个表很大,极可能会出现OOM数据库

WARN TaskSetManager : Lost task 0.0 in stage 1.0 (TID 14 , spark 047219 ) :
  java.lang.OutOfMemoryError : GC overhead limit exceeded at com.mysql.jdbc.MysqlIO.reuseAndReadPacket(MysqlIO.java : 3380 )

这种方式在数据量大的时候不建议使用。json


若是想及时了解 Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共账号: iteblog_hadoop

2、指定数据库字段的范围

  这种方式就是经过指定数据库中某个字段的范围,可是遗憾的是,这个字段必须是数字,来看看这个函数的函数原型:微信

def jdbc(
     url : String,
     table : String,
     columnName : String,
     lowerBound : Long,
     upperBound : Long,
     numPartitions : Int,
     connectionProperties : Properties) : DataFrame

  前两个字段的含义和方法一相似。columnName就是须要分区的字段,这个字段在数据库中的类型必须是数字;lowerBound就是分区的下界;upperBound就是分区的上界;numPartitions是分区的个数。一样,咱们也来看看如何使用:ide

val lowerBound = 1
val upperBound = 100000
val numPartitions = 5
 
val prop = new Properties()
val df = sqlContext.read.jdbc(url, "iteblog" , "id" , lowerBound, upperBound, numPartitions, prop)

  这个方法能够将iteblog表的数据分布到RDD的几个分区中,分区的数量由numPartitions参数决定,在理想状况下,每一个分区处理相同数量的数据,咱们在使用的时候不建议将这个值设置的比较大,由于这可能致使数据库挂掉!可是根据前面介绍,这个函数的缺点就是只能使用整形数据字段做为分区关键字。函数

  这个函数在极端状况下,也就是设置将numPartitions设置为1,其含义和第一种方式一致。oop

3、根据任意字段进行分区

  基于前面两种方法的限制,Spark还提供了根据任意字段进行分区的方法,函数原型以下:url

def jdbc(
     url : String,
     table : String,
     predicates : Array[String],
     connectionProperties : Properties) : DataFrame

这个函数相比第一种方式多了predicates参数,咱们能够经过这个参数设置分区的依据,来看看例子:

val predicates = Array[String]( "reportDate <= '2014-12-31'" ,
     "reportDate > '2014-12-31' and reportDate <= '2015-12-31'" )
 
val prop = new Properties()
val df = sqlContext.read.jdbc(url, "iteblog" , predicates, prop)

最后rdd的分区数量就等于predicates.length。

4、经过load获取

Spark还提供经过load的方式来读取数据。

sqlContext.read.format( "jdbc" ).options(
     "dbtable" -> "iteblog" )).load()

  options函数支持url、driver、dbtable、partitionColumn、lowerBound、upperBound以及numPartitions选项,细心的同窗确定发现这个和方法二的参数一致。是的,其内部实现原理部分和方法二大致一致。同时load方法还支持json、orc等数据源的读取。