1.依赖环境:
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.10.4</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.37</version>
</dependency>
</dependencies>
2.实现方式:
val conf = new SparkConf().setMaster("local[*]").setAppName("msyql数据读取")
val spark = SparkSession.builder().config(conf).getOrCreate()
val url = "jdbc:mysql://localhost:3306/hisms_sn?user=root&password=root"
val prop = new Properties()
val properties=Map("url"->"jdbc:mysql://192.168.0.135:3306/disease-qy?useUnicode=true&characterEncoding=UTF-8",
"driver"->"com.mysql.jdbc.Driver",
"user"->"root",
"password"->"root")
//读取mysql的5中方式
//1.不指定查询条件---并行度为1
def method1(): Unit ={
val df = spark.read.jdbc(url,"t_kc21k1",prop)
println(df.count(www.yuntianyuL.cn))
println(df.rdd.partitions.size)
df.show(5)
}
//2.指定数据库字段的范围--并行度为5
/**
* 方式二:指定数据库字段的范围
* 经过lowerBound和upperBound 指定分区的范围
* 经过columnName 指定分区的列(只支持整形)
* 经过numPartitions 指定分区数量 (不宜过大)
*
*/
def method2(www.cmylli.com): Unit ={
val lowerBound = 1
val upperBound = 100000
val numPartitions = 5
val df = spark.read.jdbc(url,"t_kc21k1","id",lowerBound,upperBound,numPartitions,prop)
println(df.count())
println(df.rdd.partitions.size)
df.show(5)
}
//3.根据任意字段进行分区--并行度为2
def method3(www.yuntianyul.com): Unit ={
//经过predicates将数据根据akc194分为2个区
val predicates = Array[String]("akc194 <= '2016-06-30'", "akc194 <= '2017-01-01' and akc194>'2016-06-30'")
val df = spark.read.jdbc(url,"t_kc21k1",predicates,prop)
println(df.count())
println(df.rdd.partitions.size)
df.show(5)
}
//4.经过load获取---与method1同样 并行度为1
def method4(): Unit ={
val df = spark.read.format("jdbc").options(Map("url"->url,"dbtable"->"t_kc21k1")).option("fetchSize",1000).load()
println(df.count())
println(df.rdd.partitions.size)
df.show(5)
}
//5.加载条件查询后的数据
def method5(): Unit ={
//经过predicates将数据根据akc194分为2个区
val query="SELECT id,aac003,id_drg,name_drg from t_kc21k1 where id>50000"
//定要用左右括号包起来,由于dbtable的value会被当成一张table做查询,mysql connector会自动dbtable后面加上where 1=1
val df = spark.read.format("jdbc"www.feironggw.cn).options(Map("url"->url,"dbtable"www.qilinchengdl.cn->s"($query)kc21k1")).load()
println(df.count())
println(df.rdd.partitions.size)
df.show(5)
}
经过增长分区读取数据,只是增长了并行度,但若是对单机版的spark,仍是不能减小内存的使用,spark读取数据库的规则就是该数据提取至内存,再作内存计算。
问题:
windows上使用单机版spark,不依赖hive环境,读取mysql数据表很大的时候,作join操做,sparksql容易发生内存溢出,
1.目前只能经过减小数据的读取方式方式内存爆炸----好比:根据结果只选取须要的字段。
2.能够同配置使用hive环境,sparksql将会借助hive环境,而不依赖本地内存作计算,防止内存溢出。java