Spark 编程指南 (一) [Spark Programming Guide]

Python Programming Guide - Spark(Python)html

Spark应用基本概念

每个运行在cluster上的spark应用程序,是由一个运行main函数的driver program和运行多种并行操做的executes组成python

其中spark的核心是弹性分布式数据集(Resilient Distributed Dataset—RDD)apache

  • Resilient(弹性):易变化、易计算app

  • Distributed(分布式):可横跨多台机器,集群分布maven

  • Dataset(数据集):大批量数据的集合分布式

<!-- more -->ide

RDD基本概念

RDD是逻辑集中的实体,表明一个分区的只读数据集,不可发生改变函数

【RDD的重要内部属性】单元测试

  • 分区列表(partitions)
    对于一个RDD而言,分区的多少涉及对这个RDD并行计算的粒度,每个RDD分区的计算都会在一个单独的任务中执行,每个分区对应一个Task,分区后的数据存放在内存当中测试

  • 计算每一个分区的函数(compute)
    对于Spark中每一个RDD都是以分区进行计算的,而且每一个分区的compute函数是在对迭代器进行复合操做,不须要每次计算,直到提交动做触发才会将以前全部的迭代操做进行计算,lineage在容错中有重要做用

  • 对父级RDD的依赖(dependencies)
    因为RDD存在转换关系,因此新生成的RDD对上一个RDD有依赖关系,RDD之间经过lineage产生依赖关系

【窄依赖】
每个父RDD的分区最多只被子RDD的一个分区所使用,能够相似于流水线同样,计算全部父RDD的分区;在节点计算失败的恢复上也更有效,能够直接计算其父RDD的分区,还能够进行并行计算

子RDD的每一个分区依赖于常数个父分区(即与数据规模无关)
输入输出一对一的算子,且结果RDD的分区结构不变,主要是map、flatmap
输入输出一对一,但结果RDD的分区结构发生了变化,如union、coalesce
从输入中选择部分元素的算子,如filter、distinct、subtract、sample

【宽依赖】
多个子RDD的分区会依赖于同一个父RDD的分区,须要取得其父RDD的全部分区数据进行计算,而一个节点的计算失败,将会致使其父RDD上多个分区从新计算

子RDD的每一个分区依赖于全部父RDD分区
对单个RDD基于key进行重组和reduce,如groupByKey、reduceByKey
对两个RDD基于key进行jion和重组,如jion

  • 对key-value数据类型RDD的分区器,控制分区策略和分区数(partitioner)
    partitioner就是RDD的分区函数,即HashPartitioner(哈希分区)和RangePartitioner(区域分区),分区函数决定了每一个RDD的分区策略和分区数,而且这个函数只在(k-v)类型的RDD中存在,在非(k-v)结构的RDD中是None

  • 每一个数据分区的地址列表(preferredLocations)
    与Spark中的调度相关,返回的是此RDD的每一个partition所出储存的位置,按照“移动数据不如移动计算”的理念,在spark进行任务调度的时候,尽量将任务分配到数据块所存储的位置

  • 控制操做(control operation)
    spark中对RDD的持久化操做是很重要的,能够将RDD存放在不一样的存储介质中,方便后续的操做能够重复使用。

主要有cache、persist、checkpoint,checkpoint接口是将RDD持久化到HDFS中,与persist的区别是checkpoint会切断此RDD以前的依赖关系,而persist会保留依赖关系。checkpoint的两大做用:一是spark程序长期驻留,过长的依赖会占用不少的系统资源,按期checkpoint能够有效的节省资源;二是维护过长的依赖关系可能会出现问题,一旦spark程序运行失败,RDD的容错成本会很高

Python链接Spark

Spark 1.6.0 支持 Python 2.6+ 或者 Python 3.4+,它使用标准的CPython解释器, 因此像NumPy这样的C语言类库也可使用,一样也支持PyPy 2.3+

能够用spark目录里的bin/spark-submit脚本在python中运行spark应用程序,这个脚本能够加载Java/Scala类库,让你提交应用程序到集群当中。你也可使用bin/pyspark脚本去启动python交互界面

若是你但愿访问HDFS上的数据集,你须要创建对应HDFS版本的PySpark链接。

最后,你的程序须要import一些spark类库:

from pyspark import SparkContext, SparkConf

PySpark 要求driver和workers须要相同的python版本,它一般引用环境变量PATH默认的python版本;你也能够本身指定PYSPARK_PYTHON所用的python版本,例如:

PYSPARK_PYTHON=python3.4 bin/pyspark
PYSPARK_PYTHON=/opt/pypy-2.5/bin/pypy bin/spark-submit examples/src/main/python/pi.py

初始化Spark

一个Spark应用程序的第一件事就是去建立SparkContext对象,它的做用是告诉Spark如何创建一个集群。建立SparkContext以前,先要建立SparkConf对象,SparkConf包含了应用程序的相关信息。

conf = SparkConf().setAppName(appName).setMaster(master)
sc = SparkContext(conf=conf)
  • appName:应用的名称,用户显示在集群UI上

  • master:Spark、Mesos或者YARN集群的URL,若是是本地运行,则应该是特殊的'local'字符串

在实际运行时,你不会讲master参数写死在程序代码里,而是经过spark-submit来获取这个参数;在本地测试和单元测试中,你仍然须要'local'去运行Spark应用程序

使用Shell

在PySpark Shell中,一个特殊SparkContext已经帮你建立好了,变量名是:sc,然而在Shell中建立你本身的SparkContext是不起做用的。

你能够经过--master参数设置master所链接的上下文主机;你也能够经过--py-files参数传递一个用逗号做为分割的列表,将Python中的.zip、.egg、.py等文件添加到运行路径当中;你一样能够经过--packages参数,传递一个用逗号分割的maven列表,来个这个Shell会话添加依赖(例如Spark的包)

任何额外的包含依赖的仓库(如SonaType),均可以经过--repositories参数添加进来。
Spark中全部的Python依赖(requirements.txt的依赖包列表),在必要时都必须经过pip手动安装

例如用4个核来运行bin/pyspark:

./bin/pyspark --master local[4]

或者,将code.py添加到搜索路径中(为了后面能够import):

./bin/pyspark --master local[4] --py-files code.py

经过运行pyspark --help来查看完整的操做帮助信息,在这种状况下,pyspark会调用一个通用的spark-submit脚本

在IPython这样加强Python解释器中,也能够运行PySpark Shell;支持IPython 1.0.0+;在利用IPython运行bin/pyspark时,必须将PYSPARK_DRIVER_PYTHON变量设置成ipython:

PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark

你能够经过PYSPARK_DRIVER_PYTHON_OPTS参数来本身定制ipython命令,好比在IPython Notebook中开启PyLab图形支持:

PYSPARK_DRIVER_PYTHON=ipython PYSPARK_DRIVER_PYTHON_OPTS="notebook" ./bin/pyspark

参考:Spark Programming Guide 官方文档

原博连接,请注明出处。

相关文章
相关标签/搜索