sparkSQL中cache的若干问题

摘要

sparkSQL在使用cache缓存的时候,有时候缓存可能不起做用,可能会发出缓存是假的吧的感慨。如今咱们就把这个问题说道说道。
问题sql

场景描述

当咱们经过spark进行统计和处理数据时,发现他是延迟计算的,若是一个应用中出现多个action,而这多个action处理同一个数据源数据时,数据源用时间来过滤数据时,因为有多个action操做,遇到每一个action就是一个job,每个action都会执行数据源获取数据的操做,因为两个action之间的操做存在时间差,这两个action获取的数据有可能不一致。
例以下例
test1表中的数据缓存

1,2018-07-01 10:10:03
2,2018-07-01 11:12:04app

代码以下操做
val odsData = spark.sql("""
select
from default.test1
where time < "2018-07-02"
""")
val targetData = odsData.map(fun _)
val targetData.createOrReplaceTempView("data1")
//第一个Action操做
val spark.sql("""
insert overwrite table default.test2
*
from data1
""")ide

val targetData1 = odsData.map(fun2 _) //引用同一个数据源
targetData1.createOrReplaceTempView("data2")
//第二个action操做
val spark.sql("""
insert table default.test2
*
from data2
""")oop

若是在运行第二个Action操做前,test1表中又增长了一条记录3,2018-07-01 13:12:04
即执行第一个Action时记录仍是两条1和2,而再执行完第一个Action后而又执行第二个Action以前,
增长了一个新的单子:3,2018-07-01 13:12:04
那么在test2表中的数据是怎么样的呢?
第一种状况(由于第二个action是insert而不是insert overwrite)

1,2018-07-01 10:10:03
2,2018-07-01 11:12:04
1,2018-07-01 10:10:03
2,2018-07-01 11:12:04spa

第二种状况

1,2018-07-01 10:10:03
2,2018-07-01 11:12:04
1,2018-07-01 10:10:03
2,2018-07-01 11:12:04
3,2018-07-01 13:12:043d

结果分析

结果是第二中状况。若是认为是第一种状况的对spark的执行计划仍是不太熟悉。首先spark是lazy计算的,即不触发action操做,其实不提交做业的。而在这个application中存在两个action,而这两个aciton使用了同一个数据源的rdd,应该称为变量odsData,当遇到第一个action,其会把本身这个执行链上的rdd都执行一遍,包括执行odsData,而遇到第二个aciton的时候,其也会把本身的执行链上的数据又执行了一遍包括odsData,并从数据源中从新取数。有人会疑惑,第一个action在执行的时候,已经执行了odsData,这个RDD的结果不该该缓存起来吗?我的认为,spark尚未那么的智能,而且网上常常说的job,stage,rdd,task的划分应该是在同一个job内进行的。而同一个应用中夸job的stage拆分是不存在的。那么出现这个结果应该怎么办呢?
cache的出场日志

当出现这样的状况时,个人应用天天就会漏几十条数据,非常烦人,最后发现了上面的问题,当时想解决方案时,第一个就是想到了cache,我把第一次执行Action操做时,把odsData给缓存了,这样应该不会有什么问题了吧。从而能够保证两个action操做,同一个数据源的数据一致性。只能说too young to sample了。这样解决不了上面出现的问题。一样以一个例子来看。
test表中的数据:blog

1 2017-01-01 01:00:00 2016-05-04 9999-12-31
2 2017-01-01 02:00:00 2016-01-01 9999-12-31
代码:ip

val curentData = spark.sql(
"""
|select
|*
|from default.test
""".stripMargin)

curentData.cache() //缓存咱们的结果

curentData.createOrReplaceTempView("dwData")

//第一个Action
spark.sql(
"""
|INSERT OVERWRITE TABLE default.test1
|SELECT
|
|FROM dwData
""".stripMargin)
//改变数据源表test表的数据而且是第二个Action
spark.sql(
"""
|INSERT OVERWRITE TABLE default.test
|SELECT
| 1,
| "2017",
| "2018",
| "2018"
|FROM default.test
""".stripMargin)
//第三个Action和第一个Action同数据源,而且cache第一次运行的结果。
spark.sql(
"""
|INSERT OVERWRITE TABLE default.test1
|SELECT
|

|FROM dwData
""".stripMargin)
那么test1表中的结果
第一种状况:
1 2017-01-01 01:00:00 2016-05-04 9999-12-31
2 2017-01-01 02:00:00 2016-01-01 9999-12-31
第二种状况

1 2017 2018 2018
1 2017 2018 2018
结果分析

结果是第二种状况,也就是说咱们cache根本就没有起到效果,或者说第三个Action根本就没有使用咱们cache的数据。此次我把日志都打出来了啊。
第一个Action的声明周期:
sparkSQL中cache的若干问题

第三个Action的日志:
sparkSQL中cache的若干问题

从这两个日志能够看出,咱们设置cache其只能在同一个job中生效。而夸job的使用这样的数据缓存数据是不存在的。
若是想更加详细的了解cache的原理和做用,能够去网上搜,大把大把的资料,可是必定要记住,网上说的要限定一个条件,在同一个job内的rdd,夸job的cache是不存在的。
解决方案

咱们最终但愿解决的事,当两个action想要使用同一个数据源的rdd的时候,如何保证其数据的一致性。
方案:
把第一个Action算子用到的数据源给写入到一个临时表中
而后再第二个Action中,直接读取临时表的数据,而不是直接使用odsData
更好的方案尚未想好,能够根据业务的不一样来搞。

第二个方案如今就是咱们使用spark提供的checkpoint机制,checkpoint会把咱们的数据自动缓存到hdfs,它就会把这个rdd之前的父rdd的数据所有删除,之后无论哪一个job的rdd须要使用这个rdd的数据时,都会从这个checkpoin的目录中读取数据。spark.sparkContext.setCheckpointDir("hdfs://hadoop-1:5000/hanfangfang")curentData.cache().checkpoint这样就可使不一样的job,同一个数据源数据的一致性。同时咱们也要记住,当程序运行完成,其不会删除checkpoint的数据的,须要们手动删除。

相关文章
相关标签/搜索