本文始发于我的公众号:TechFlow,原创不易,求个关注java
今天是spark专题的第五篇,咱们来看看DataFrame。程序员
用过Python作过机器学习的同窗对Python当中pandas当中的DataFrame应该不陌生,若是没作过也没有关系,咱们简单来介绍一下。DataFrame翻译过来的意思是数据帧,但其实它指的是一种特殊的数据结构,使得数据以相似关系型数据库当中的表同样存储。使用DataFrame咱们能够很是方便地对整张表进行一些相似SQL的一些复杂的处理。Apache Spark在升级到了1.3版本以后,也提供了相似功能的DataFrame,也就是大名鼎鼎的SparkSQL。web
关于SparkSQL的前世此生实际上是有一大段历史的,这一段历史除了能够充当吹牛的谈资以外,还能够帮助咱们理清楚许多技术之间的内在关联。sql
在程序开发这个行当,优化和重构注定是两个没法摆脱的问题。数据库
当一个项目启动的时候,因为投入有限,可能招不到特别匹配的人才,或者是为了快速知足业务的须要。每每会采起一些不是特别合理的设计来构建项目,这个应该很好理解,为了图快牺牲一些性能或者是拓展性。并且有时候因为视野和能力的限制,早期的开发者可能也是没法意识到设计中的不合理性的。可是俗话说得好,出来混迟早是要还的。前面挖了坑,后来迟早也会暴露出来。问题就在于暴露了以后咱们怎么处理。编程
通常来讲,不管是做为公司也好,仍是做为开发者我的也罢。想的确定都是怎么样以最小的代价解决问题,也就是尽可能优化,能不动核心代码就不动。除了由于核心代码过久没有维护或者是文档缺失以外,也涉及到成本问题。如今的项目日进斗金,天天都在运行,一旦要下决心把核心代码翻新一遍,那么会付出巨大的代价,可能整个项目组要暂停一段时间。并且在上层管理层眼中,每每也是看不到重构的必要性的。由于上层都是以业务为导向的,技术作得好很差不重要,能赚钱才是王道。json
但问题是优化并非无止境的,不少时候核心设计的不合理才是大头,边边角角的修补只能聊胜于无。这个时候考验的每每都是技术负责人的担当了,是当个糊裱匠混一年是一年,仍是壮士断腕,敢叫日月换新天。通常来讲糊裱起到的效果都是有限的,总会有撑不下去要重构的那天。session
SparkSQL早期的发展就很是好的印证了这点,SparkSQL诞生之初就是当作一个优化项目诞生的。目的是为了优化Hive中在spark的效率。数据结构
这里的Hive可能不少人不太熟悉,它是Hadoop家族结构化查询的工具。将hadoop集群中的数据以表结构的形式存储,让程序员能够以类SQL语句来查询数据。看起来和数据库有些近似,但原理不太同样。Hive底层是以MapReduce驱动的,也就是说会把咱们写好的SQL转化成MapReduce执行。因为Hive易用性很好,使用的人不少,因此spark当中也支持Hive。架构
但其实那个时候spark兴起,MapReduce时代已经逐渐走到了末期。那时的spark是基于前面介绍的RDD的结构处理数据的,性能比MapReduce好得多。但若是在spark上依然使用MapReduce的形式支持Hive,那么就不能体现出spark计算性能的优越性。因此对于Hive on Spark的优化势在必行。我我的以为这有点抢市场的调调。
最好的办法是对spark完全重构,重建出一套支持结构化数据查询的计算框架。但估计那时候主负责人没能狠下心,或者是为了赶时间。因此只是对Hive进行了一些优化,大概就是把一些使用MapReduce的计算想办法尽可能改为使用RDD,从而提高总体的效率。这样作固然是可以有提高的,可是核心的框架仍然是Hive的那一套机制,这样的提高是有限的。大概过了三年左右的时间,基本上全部能压榨出来的性能都被压榨完了,开发组通过激烈的思想斗争以后,终于接受现实,完全抛弃本来的框架,构建出一套新的架构来。
这套新开发出的架构就是SparkSQL,也就是DataFrame。
咱们来简单看下SparkSQL的架构,大概知道内部是怎么运行的。
整个SparkSQL的模型大概分为三层,最上面是编程模型层,中间是执行优化层,最后是任务执行引擎。
这些都是术语,咱们简单介绍一下,编程模型层主要有两块一块是SparkSQL一种是DataFrame,这二者只是语法不同,底层执行的逻辑是同样的。主要作的是对咱们写的一些语法进行解析以及一些基本的处理。执行计划层是将SQL语句转化成具体须要执行的逻辑执行计划,根据一些策略进行优化以后输出物理执行策略。最后一层是执行层,负责将物理计划转化成RDD或者是DAG进行执行。
咱们观察一下这个架构,可能还有不少细节不是很清楚,可是至少整个执行的过程已经很明白了。进一步能够发现,整个架构当中已经彻底没有MapReduce的影子了,底层的执行单元就是RDD。也就是说SparkSQL实际上是进一步更高层次的封装。
咱们来简单看下DataFrame和RDD的差异,最大最直观的差异就是DataFrame多了schema的概念。也就是多了数据格式的概念,咱们拿到DataFrame能够很轻松地获取它其中数据的结构信息。
咱们看下下图作个对比,一样一份数据在RDD和DataFrame的样子:
不要小瞧这个schema,有了它以后,咱们就能够作一些结构化数据才支持的操做了。好比groupby、where、sum等等。这些结构化数据操做的灵活度要比RDD的map、filter等操做大得多。
另一个好处就是效率,若是咱们本身写RDD来操做数据的话,那么Python是必定干不过scala和java的。由于spark底层是依托Java实现的,spark的全部计算都执行在JVM当中。scala和java都是直接在JVM当中直接运行的语言,而Python不行,因此以前咱们使用Python调用RDD处理spark的速度也会慢不少。由于咱们须要通过多层中转,咱们能够看下下面这张图。
当咱们执行pyspark当中的RDD时,spark context会经过Py4j启动一个使用JavaSparkContext的JVM,全部的RDD的转化操做都会被映射成Java中的PythonRDD对象。当咱们的任务被传输到Workder进行执行的时候,PythonRDD会启动Python的子进程来传输代码和执行的结果。
上面这段话提及来有点绕,简单理解就是当pyspark调用RDD的时候,Python会转化成Java调用spark集群分发任务。每个任务具体在机器上执行的时候,仍是以Python程序的方式执行。执行结束以后,仍是经过Python拿回数据给spark中的JVM。JVM执行结束以后,再把结果包装成Python的类型返回给调用端。
原本Python的执行效率就低,加上中间又通过了若干次转换以及通讯开销(占大头),这就致使了pyspark中的RDD操做效率更低。
而如今有了Catalyst优化器以后,会自动帮助咱们进行底层的计算优化。而且即便是非原生的Python语言,也可使用它,所以会带来性能的极大提高。甚至通过官方的测量,使用pyspark写DataFrame的效率已经和scala和java分庭抗礼了。
因此若是咱们要选择Python做为操做spark的语言,DataFrame必定是首选。不过Catalyst优化器也有短板,它没法解决跨语言自己带来的问题。好比咱们使用Python写一些udf(user defined function),仍是会带来性能的损耗。这个时候的总体效率仍是会比scala低一些。
写了这么多废话,下面就让咱们实际一点,看看究竟pyspark当中的DataFrame要如何使用吧。
和RDD同样,DataFrame的建立方法有不少,咱们能够基于内存当中的数据进行建立,也能够从本地文件或者是HDFS等其余云存储系统当中进行读取。但怎么读取不重要,使用方法才是关键,为了方便演示,咱们先来看看如何从内存当中建立DataFrame。
前文当中曾经说过,DataFrame当中的数据以表结构的形式存储。也就是说咱们读入的通常都是结构化的数据,咱们常用的结构化的存储结构就是json,因此咱们先来看看如何从json字符串当中建立DataFrame。
首先,咱们建立一个json类型的RDD。
jsonstr = sc.parallelize((""" {'name': 'xiaoming', 'age': 13, 'score': 100}""",
"""{'name': 'xiaohong', 'age': 15, 'score': 98}"""
))
接着,咱们用spark.read.json将它转化成一个DataFrame。须要注意的是,若是数据量很大,这个执行会须要一点时间,可是它仍然是一个转化操做。数据其实并无真正被咱们读入,咱们读入的只是它的schema而已,只有当咱们执行执行操做的时候,数据才会真正读入处理。
studentDf = spark.read.json(jsonstr)
执行完这一句以后,RDD转DataFrame的工做就完成了。严格提及来这是读取操做,并非真正的转化操做。RDD转DataFrame稍微复杂一些,咱们晚点再说。
若是咱们想要查看DataFrame当中的内容,咱们能够执行show方法,这是一个行动操做。和pandas中的head相似,执行以后,会展现出DataFrame当中前20条数据。咱们也能够传入参数,指定咱们要求展现的数据条数。
咱们来运行一下,看看展现出来的结果:
咱们也collect一下本来的RDD做为一下对比:
这下一对比咱们就发现了,json格式的字符串果真能够被解析,而且RDD被转化成了表格格式的DataFrame。
咱们再来看下DataFrame的简单查询功能,其实Dataframe当中的查询功能不少。咱们今天先来看其中用得比较多的两种。
先来看第一种,第一种是经过select接口查询数据。这里的select其实对应的是SQL语句当中的select,含义也基本相同,不一样的是咱们是经过函数进行调用的而已。
咱们能够在select当中传入咱们想要查找的列名。
咱们能够加上where或者filter函数进行条件判断,where和filter函数是一个意思,二者的用法也彻底同样。官方提供了两个名字,为了避免同习惯的人使用方便而已。咱们把下图当中的函数换成filter结果也是同样的。
另一种操做方式稍稍复杂一些,则是将DataFrame注册成pyspark中的一张视图。这里的视图和数据库中的视图基本上是一个概念,spark当中支持两种不一样的视图。第一种是临时视图,第二种是全局视图。二者的用法基本一致,不一样的是做用范围。临时视图的做用范围是当前的session,若是当前的session关闭,或者是另外开启了新的session,这个视图就会做废。而全局视图则是跨session的,全部session均可以使用。
若是搞不清楚session的概念也没有关系,在以后的文章当中咱们还会遇到的。咱们先有这么个印象便可。
咱们调用createOrReplaceTempView方法建立一个临时视图,有了视图以后,咱们就能够经过SQL语句来查询数据了。
studentDf.createOrReplaceTempView("student")
咱们经过spark.sql传入一段SQL string便可完成数据的调用,须要注意的是,DataFrame也支持RDD的collect或者take等方法。若是这里的结果咱们调用的是collect,那么spark会将全部数据都返回。若是数据集很大的状况下可能会出现问题,因此要注意show和collect的使用范围和区别,在一些场景下搞错了会很危险。
今天这篇文章咱们一块儿来看了pyspark当中目前为止最经常使用的数据处理工具——DataFrame,还简单了解了一下它和RDD相比的性能优点以及它简单的查询语法的使用方法。
从上面的方法咱们也看得出来,相比以前RDD中介绍的那些方法,DataFrame中封装的API提供了更多高级的功能,比写RDD处理数据也要方便不少。再加上性能缘由,咱们在处理数据时必然首选使用DataFrame。相信你们经过本文对于DataFrame也应该有了一个最初的印象,后续还会有更多文章详细地介绍DataFrame的使用以及内部机制的一些细节,敬请期待吧。
今天的文章就到这里,原创不易,扫码关注我,获取更多精彩文章。