Spark入门(Python)

Hadoop是对大数据集进行分布式计算的标准工具,这也是为何当你穿过机场时能看到”大数据(Big Data)”广告的缘由。它已经成为大数据的操做系统,提供了包括工具和技巧在内的丰富生态系统,容许使用相对便宜的商业硬件集群进行超级计算机级别的计算。2003和2004年,两个来自Google的观点使Hadoop成为可能:一个分布式存储框架(Google文件系统),在Hadoop中被实现为HDFS;一个分布式计算框架(MapReduce)。html

这两个观点成为过去十年规模分析(scaling analytics)、大规模机器学习(machine learning),以及其余大数据应用出现的主要推进力!可是,从技术角度上讲,十年是一段很是长的时间,并且Hadoop还存在不少已知限制,尤为是MapReduce。对MapReduce编程明显是困难的。对大多数分析,你都必须用不少步骤将Map和Reduce任务串接起来。这形成类SQL的计算或机器学习须要专门的系统来进行。更糟的是,MapReduce要求每一个步骤间的数据要序列化到磁盘,这意味着MapReduce做业的I/O成本很高,致使交互分析和迭代算法(iterative algorithms)开销很大;而事实是,几乎全部的最优化和机器学习都是迭代的。java

为了解决这些问题,Hadoop一直在向一种更为通用的资源管理框架转变,即YARN(Yet Another Resource Negotiator, 又一个资源协调者)。YARN实现了下一代的MapReduce,但同时也容许应用利用分布式资源而没必要采用MapReduce进行计算。经过将集群管理通常化,研究转到分布式计算的通常化上,来扩展了MapReduce的初衷。node

Spark是第一个脱胎于该转变的快速、通用分布式计算范式,而且很快流行起来。Spark使用函数式编程范式扩展了MapReduce模型以支持更多计算类型,能够涵盖普遍的工做流,这些工做流以前被实现为Hadoop之上的特殊系统。Spark使用内存缓存来提高性能,所以进行交互式分析也足够快速(就如同使用Python解释器,与集群进行交互同样)。缓存同时提高了迭代算法的性能,这使得Spark很是适合数据理论任务,特别是机器学习。python

本文中,咱们将首先讨论如何在本地机器上或者EC2的集群上设置Spark进行简单分析。而后,咱们在入门级水平探索Spark,了解Spark是什么以及它如何工做(但愿能够激发更多探索)。最后两节咱们开始经过命令行与Spark进行交互,而后演示如何用Python写Spark应用,并做为Spark做业提交到集群上。git

设置Spark算法

在本机设置和运行Spark很是简单。你只须要下载一个预构建的包,只要你安装了Java 6+和Python 2.6+,就能够在Windows、Mac OS X和Linux上运行Spark。确保java程序在PATH环境变量中,或者设置了JAVA_HOME环境变量。相似的,python也要在PATH中。shell

假设你已经安装了Java和Python:数据库

  1. 访问Spark下载页
  2. 选择Spark最新发布版(本文写做时是1.2.0),一个预构建的Hadoop 2.4包,直接下载。

如今,如何继续依赖于你的操做系统,靠你本身去探索了。Windows用户能够在评论区对如何设置的提示进行评论。apache

通常,个人建议是按照下面的步骤(在POSIX操做系统上):编程

1.解压Spark

~$ tar -xzf spark-1.2.0-bin-hadoop2.4.tgz

 

2.将解压目录移动到有效应用程序目录中(如Windows上的

~$ mv spark-1.2.0-bin-hadoop2.4 /srv/spark-1.2.0

 

3.建立指向该Spark版本的符号连接到<spark目录。这样你能够简单地下载新/旧版本的Spark,而后修改连接来管理Spark版本,而不用更改路径或环境变量。

~$ ln -s /srv/spark-1.2.0 /srv/spark

 

4.修改BASH配置,将Spark添加到PATH中,设置SPARK_HOME环境变量。这些小技巧在命令行上会帮到你。在Ubuntu上,只要编辑~/.bash_profile或~/.profile文件,将如下语句添加到文件中:

export SPARK_HOME=/srv/spark
export PATH=$SPARK_HOME/bin:$PATH

 

5.source这些配置(或者重启终端)以后,你就能够在本地运行一个pyspark解释器。执行pyspark命令,你会看到如下结果:

~$ pyspark
Python 2.7.8 (default, Dec  2 2014, 12:45:58)
[GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.54)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties
[… snip …]
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  `_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.2.0
      /_/
 
Using Python version 2.7.8 (default, Dec  2 2014 12:45:58)
SparkContext available as sc.
>>>

 

如今Spark已经安装完毕,能够在本机以”单机模式“(standalone mode)使用。你能够在本机开发应用并提交Spark做业,这些做业将以多进程/多线程模式运行的,或者,配置该机器做为一个集群的客户端(不推荐这样作,由于在Spark做业中,驱动程序(driver)是个很重要的角色,而且应该与集群的其余部分处于相同网络)。可能除了开发,你在本机使用Spark作得最多的就是利用spark-ec2脚原本配置Amazon云上的一个EC2 Spark集群了。

简略Spark输出

Spark(和PySpark)的执行能够特别详细,不少INFO日志消息都会打印到屏幕。开发过程当中,这些很是恼人,由于可能丢失Python栈跟踪或者print的输出。为了减小Spark输出 – 你能够设置$SPARK_HOME/conf下的log4j。首先,拷贝一份$SPARK_HOME/conf/log4j.properties.template文件,去掉“.template”扩展名。

~$ cp $SPARK_HOME/conf/log4j.properties.template $SPARK_HOME/conf/log4j.properties

 

编辑新文件,用WARN替换代码中出现的INFO。你的log4j.properties文件相似:

# Set everything to be logged to the console
 log4j.rootCategory=WARN, console
 log4j.appender.console=org.apache.log4j.ConsoleAppender
 log4j.appender.console.target=System.err
 log4j.appender.console.layout=org.apache.log4j.PatternLayout
 log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Settings to quiet third party logs that are too verbose
 log4j.logger.org.eclipse.jetty=WARN
 log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
 log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN
 log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=WARN

 

如今运行PySpark,输出消息将会更简略!感谢@genomegeek在一次District Data Labs的研讨会中指出这一点。

在Spark中使用IPython Notebook

当搜索有用的Spark小技巧时,我发现了一些文章提到在PySpark中配置IPython notebook。IPython notebook对数据科学家来讲是个交互地呈现科学和理论工做的必备工具,它集成了文本和Python代码。对不少数据科学家,IPython notebook是他们的Python入门,而且使用很是普遍,因此我想值得在本文中说起。

这里的大部分说明都来改编自IPython notebook: 在PySpark中设置IPython。可是,咱们将聚焦在本机以单机模式将IPtyon shell链接到PySpark,而不是在EC2集群。若是你想在一个集群上使用PySpark/IPython,查看并评论下文的说明吧!

  1. 1.为Spark建立一个iPython notebook配置
~$ ipython profile create spark
[ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_config.py'
[ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_notebook_config.py'
[ProfileCreate] Generating default config file: u'$HOME/.ipython/profile_spark/ipython_nbconvert_config.py'

 

记住配置文件的位置,替换下文各步骤相应的路径:

2.建立文件$HOME/.ipython/profile_spark/startup/00-pyspark-setup.py,并添加以下代码:

import os
import sys
 
# Configure the environment
if 'SPARK_HOME' not in os.environ:
    os.environ['SPARK_HOME'] = '/srv/spark'
 
# Create a variable for our root path
SPARK_HOME = os.environ['SPARK_HOME']
 
# Add the PySpark/py4j to the Python Path
sys.path.insert(0, os.path.join(SPARK_HOME, "python", "build"))
sys.path.insert(0, os.path.join(SPARK_HOME, "python"))

 

3.使用咱们刚刚建立的配置来启动IPython notebook。

~$ ipython notebook --profile spark

 

4.在notebook中,你应该能看到咱们刚刚建立的变量。

print SPARK_HOME

 

5.在IPython notebook最上面,确保你添加了Spark context。

from pyspark import  SparkContext
sc = SparkContext( 'local', 'pyspark')

 

6.使用IPython作个简单的计算来测试Spark context。

def isprime(n):
"""
check if integer n is a prime
"""
# make sure n is a positive integer
n = abs(int(n))
# 0 and 1 are not primes
if n < 2:
    return False
# 2 is the only even prime number
if n == 2:
    return True
# all other even numbers are not primes
if not n & 1:
    return False
# range starts with 3 and only needs to go up the square root of n
# for all odd numbers
for x in range(3, int(n**0.5)+1, 2):
    if n % x == 0:
        return False
return True
 
# Create an RDD of numbers from 0 to 1,000,000
nums = sc.parallelize(xrange(1000000))
 
# Compute the number of primes in the RDD
print nums.filter(isprime).count()

 

若是你能获得一个数字并且没有错误发生,那么你的context正确工做了!

编辑提示:上面配置了一个使用PySpark直接调用IPython notebook的IPython context。可是,你也可使用PySpark按如下方式直接启动一个notebook: $ IPYTHON_OPTS=”notebook –pylab inline” pyspark

哪一个方法好用取决于你使用PySpark和IPython的具体情景。前一个容许你更容易地使用IPython notebook链接到一个集群,所以是我喜欢的方法。

在EC2上使用Spark

在讲授使用Hadoop进行分布式计算时,我发现不少能够经过在本地伪分布式节点(pseudo-distributed node)或以单节点模式(single-node mode)讲授。可是为了了解真正发生了什么,就须要一个集群。当数据变得庞大,这些书面讲授的技能和真实计算需求间常常出现隔膜。若是你肯在学习详细使用Spark上花钱,我建议你设置一个快速Spark集群作作实验。 包含5个slave(和1个master)每周大概使用10小时的集群每个月大概须要$45.18。

完整的讨论能够在Spark文档中找到:在EC2上运行Spark在你决定购买EC2集群前必定要通读这篇文档!我列出了一些关键点:

  1. 经过AWS Console获取AWS EC2 key对(访问key和密钥key)。
  2. 将key对导出到你的环境中。在shell中敲出如下命令,或者将它们添加到配置中。
export AWS_ACCESS_KEY_ID=myaccesskeyid
export AWS_SECRET_ACCESS_KEY=mysecretaccesskey

 

注意不一样的工具使用不一样的环境名称,确保你用的是Spark脚本所使用的名称。

3.启动集群:

~$ cd $SPARK_HOME/ec2
ec2$ ./spark-ec2 -k <keypair> -i <key-file> -s <num-slaves> launch <cluster-name>

 

4.SSH到集群来运行Spark做业。

ec2$ ./spark-ec2 -k <keypair> -i <key-file> login <cluster-name>

 

5.销毁集群

ec2$ ./spark-ec2 destroy &lt;cluster-name&gt;.

 

这些脚本会自动建立一个本地的HDFS集群来添加数据,copy-dir命令能够同步代码和数据到该集群。可是你最好使用S3来存储数据,建立使用s3://URI来加载数据的RDDs。

Spark是什么?

既然设置好了Spark,如今咱们讨论下Spark是什么。Spark是个通用的集群计算框架,经过将大量数据集计算任务分配到多台计算机上,提供高效内存计算。若是你熟悉Hadoop,那么你知道分布式计算框架要解决两个问题:如何分发数据和如何分发计算。Hadoop使用HDFS来解决分布式数据问题,MapReduce计算范式提供有效的分布式计算。相似的,Spark拥有多种语言的函数式编程API,提供了除map和reduce以外更多的运算符,这些操做是经过一个称做弹性分布式数据集(resilient distributed datasets, RDDs)的分布式数据框架进行的。

本质上,RDD是种编程抽象,表明能够跨机器进行分割的只读对象集合。RDD能够从一个继承结构(lineage)重建(所以能够容错),经过并行操做访问,能够读写HDFS或S3这样的分布式存储,更重要的是,能够缓存到worker节点的内存中进行当即重用。因为RDD能够被缓存在内存中,Spark对迭代应用特别有效,由于这些应用中,数据是在整个算法运算过程当中均可以被重用。大多数机器学习和最优化算法都是迭代的,使得Spark对数据科学来讲是个很是有效的工具。另外,因为Spark很是快,能够经过相似Python REPL的命令行提示符交互式访问。

Spark库自己包含不少应用元素,这些元素能够用到大部分大数据应用中,其中包括对大数据进行相似SQL查询的支持,机器学习和图算法,甚至对实时流数据的支持。

核心组件以下:

  • Spark Core:包含Spark的基本功能;尤为是定义RDD的API、操做以及这二者上的动做。其余Spark的库都是构建在RDD和Spark Core之上的。
  • Spark SQL:提供经过Apache Hive的SQL变体Hive查询语言(HiveQL)与Spark进行交互的API。每一个数据库表被当作一个RDD,Spark SQL查询被转换为Spark操做。对熟悉Hive和HiveQL的人,Spark能够拿来就用。
  • Spark Streaming:容许对实时数据流进行处理和控制。不少实时数据库(如Apache Store)能够处理实时数据。Spark Streaming容许程序可以像普通RDD同样处理实时数据。
  • MLlib:一个经常使用机器学习算法库,算法被实现为对RDD的Spark操做。这个库包含可扩展的学习算法,好比分类、回归等须要对大量数据集进行迭代的操做。以前可选的大数据机器学习库Mahout,将会转到Spark,并在将来实现。
  • GraphX:控制图、并行图操做和计算的一组算法和工具的集合。GraphX扩展了RDD API,包含控制图、建立子图、访问路径上全部顶点的操做。

因为这些组件知足了不少大数据需求,也知足了不少数据科学任务的算法和计算上的须要,Spark快速流行起来。不只如此,Spark也提供了使用Scala、Java和Python编写的API;知足了不一样团体的需求,容许更多数据科学家简便地采用Spark做为他们的大数据解决方案。

对Spark编程

编写Spark应用与以前实如今Hadoop上的其余数据流语言相似。代码写入一个惰性求值的驱动程序(driver program)中,经过一个动做(action),驱动代码被分发到集群上,由各个RDD分区上的worker来执行。而后结果会被发送回驱动程序进行聚合或编译。本质上,驱动程序建立一个或多个RDD,调用操做来转换RDD,而后调用动做处理被转换后的RDD。

这些步骤大致以下:

  1. 定义一个或多个RDD,能够经过获取存储在磁盘上的数据(HDFS,Cassandra,HBase,Local Disk),并行化内存中的某些集合,转换(transform)一个已存在的RDD,或者,缓存或保存。
  2. 经过传递一个闭包(函数)给RDD上的每一个元素来调用RDD上的操做。Spark提供了除了Map和Reduce的80多种高级操做。
  3. 使用结果RDD的动做(action)(如count、collect、save等)。动做将会启动集群上的计算。

当Spark在一个worker上运行闭包时,闭包中用到的全部变量都会被拷贝到节点上,可是由闭包的局部做用域来维护。Spark提供了两种类型的共享变量,这些变量能够按照限定的方式被全部worker访问。广播变量会被分发给全部worker,可是是只读的。累加器这种变量,worker可使用关联操做来“加”,一般用做计数器。

Spark应用本质上经过转换和动做来控制RDD。后续文章将会深刻讨论,可是理解了这个就足以执行下面的例子了。

Spark的执行

简略描述下Spark的执行。本质上,Spark应用做为独立的进程运行,由驱动程序中的SparkContext协调。这个context将会链接到一些集群管理者(如YARN),这些管理者分配系统资源。集群上的每一个worker由执行者(executor)管理,执行者反过来由SparkContext管理。执行者管理计算、存储,还有每台机器上的缓存。

重点要记住的是应用代码由驱动程序发送给执行者,执行者指定context和要运行的任务。执行者与驱动程序通讯进行数据分享或者交互。驱动程序是Spark做业的主要参与者,所以须要与集群处于相同的网络。这与Hadoop代码不一样,Hadoop中你能够在任意位置提交做业给JobTracker,JobTracker处理集群上的执行。

与Spark交互

使用Spark最简单的方式就是使用交互式命令行提示符。打开PySpark终端,在命令行中打出pyspark。

~$ pyspark
[… snip …]
>>>

 

PySpark将会自动使用本地Spark配置建立一个SparkContext。你能够经过sc变量来访问它。咱们来建立第一个RDD。

>>> text = sc.textFile("shakespeare.txt")
>>> print text
shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2

 

textFile方法将莎士比亚所有做品加载到一个RDD命名文本。若是查看了RDD,你就能够看出它是个MappedRDD,文件路径是相对于当前工做目录的一个相对路径(记得传递磁盘上正确的shakespear.txt文件路径)。咱们转换下这个RDD,来进行分布式计算的“hello world”:“字数统计”。

>>> from operator import add
>>> def tokenize(text):
...     return text.split()
...
>>> words = text.flatMap(tokenize)
>>> print words
PythonRDD[2] at RDD at PythonRDD.scala:43

 

咱们首先导入了add操做符,它是个命名函数,能够做为加法的闭包来使用。咱们稍后再使用这个函数。首先咱们要作的是把文本拆分为单词。咱们建立了一个tokenize函数,参数是文本片断,返回根据空格拆分的单词列表。而后咱们经过给flatMap操做符传递tokenize闭包对textRDD进行变换建立了一个wordsRDD。你会发现,words是个PythonRDD,可是执行本应该当即进行。显然,咱们尚未把整个莎士比亚数据集拆分为单词列表。

若是你曾使用MapReduce作过Hadoop版的“字数统计”,你应该知道下一步是将每一个单词映射到一个键值对,其中键是单词,值是1,而后使用reducer计算每一个键的1总数。

首先,咱们map一下。

>>> wc = words.map(lambda x: (x,1))
>>> print wc.toDebugString()
(2) PythonRDD[3] at RDD at PythonRDD.scala:43
|  shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2
|  shakespeare.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2

 

我使用了一个匿名函数(用了Python中的lambda关键字)而不是命名函数。这行代码将会把lambda映射到每一个单词。所以,每一个x都是一个单词,每一个单词都会被匿名闭包转换为元组(word, 1)。为了查看转换关系,咱们使用toDebugString方法来查看PipelinedRDD是怎么被转换的。可使用reduceByKey动做进行字数统计,而后把统计结果写到磁盘。

>>> counts = wc.reduceByKey(add)
>>> counts.saveAsTextFile("wc")

 

一旦咱们最终调用了saveAsTextFile动做,这个分布式做业就开始执行了,在做业“跨集群地”(或者你本机的不少进程)运行时,你应该能够看到不少INFO语句。若是退出解释器,你能够看到当前工做目录下有个“wc”目录。

$ ls wc/
_SUCCESS   part-00000 part-00001

 

每一个part文件都表明你本机上的进程计算获得的被保持到磁盘上的最终RDD。若是对一个part文件进行head命令,你应该能看到字数统计元组。

$ head wc/part-00000
(u'fawn', 14)
(u'Fame.', 1)
(u'Fame,', 2)
(u'kinghenryviii@7731', 1)
(u'othello@36737', 1)
(u'loveslabourslost@51678', 1)
(u'1kinghenryiv@54228', 1)
(u'troilusandcressida@83747', 1)
(u'fleeces', 1)
(u'midsummersnightsdream@71681', 1)

 

注意这些键没有像Hadoop同样被排序(由于Hadoop中Map和Reduce任务中有个必要的打乱和排序阶段)。可是,能保证每一个单词在全部文件中只出现一次,由于你使用了reduceByKey操做符。你还可使用sort操做符确保在写入到磁盘以前全部的键都被排过序。

编写一个Spark应用

编写Spark应用与经过交互式控制台使用Spark相似。API是相同的。首先,你须要访问<SparkContext,它已经由<pyspark自动加载好了。

使用Spark编写Spark应用的一个基本模板以下:

## Spark Application - execute with spark-submit
 
## Imports
from pyspark import SparkConf, SparkContext
 
## Module Constants
APP_NAME = "My Spark Application"
 
## Closure Functions
 
## Main functionality
 
def main(sc):
    pass
 
if __name__ == "__main__":
    # Configure Spark
    conf = SparkConf().setAppName(APP_NAME)
    conf = conf.setMaster("local[*]")
    sc   = SparkContext(conf=conf)
 
    # Execute Main functionality
    main(sc)

 

这个模板列出了一个Spark应用所需的东西:导入Python库,模块常量,用于调试和Spark UI的可识别的应用名称,还有做为驱动程序运行的一些主要分析方法学。在ifmain中,咱们建立了SparkContext,使用了配置好的context执行main。咱们能够简单地导入驱动代码到pyspark而不用执行。注意这里Spark配置经过setMaster方法被硬编码到SparkConf,通常你应该容许这个值经过命令行来设置,因此你能看到这行作了占位符注释。

使用<sc.stop()或<sys.exit(0)来关闭或退出程序。

## Spark Application - execute with spark-submit
 
## Imports
import csv
import matplotlib.pyplot as plt
 
from StringIO import StringIO
from datetime import datetime
from collections import namedtuple
from operator import add, itemgetter
from pyspark import SparkConf, SparkContext
 
## Module Constants
APP_NAME = "Flight Delay Analysis"
DATE_FMT = "%Y-%m-%d"
TIME_FMT = "%H%M"
 
fields   = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep',
            'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance')
Flight   = namedtuple('Flight', fields)
 
## Closure Functions
def parse(row):
    """
    Parses a row and returns a named tuple.
    """
 
    row[0]  = datetime.strptime(row[0], DATE_FMT).date()
    row[5]  = datetime.strptime(row[5], TIME_FMT).time()
    row[6]  = float(row[6])
    row[7]  = datetime.strptime(row[7], TIME_FMT).time()
    row[8]  = float(row[8])
    row[9]  = float(row[9])
    row[10] = float(row[10])
    return Flight(*row[:11])
 
def split(line):
    """
    Operator function for splitting a line with csv module
    """
    reader = csv.reader(StringIO(line))
    return reader.next()
 
def plot(delays):
    """
    Show a bar chart of the total delay per airline
    """
    airlines = [d[0] for d in delays]
    minutes  = [d[1] for d in delays]
    index    = list(xrange(len(airlines)))
 
    fig, axe = plt.subplots()
    bars = axe.barh(index, minutes)
 
    # Add the total minutes to the right
    for idx, air, min in zip(index, airlines, minutes):
        if min > 0:
            bars[idx].set_color('#d9230f')
            axe.annotate(" %0.0f min" % min, xy=(min+1, idx+0.5), va='center')
        else:
            bars[idx].set_color('#469408')
            axe.annotate(" %0.0f min" % min, xy=(10, idx+0.5), va='center')
 
    # Set the ticks
    ticks = plt.yticks([idx+ 0.5 for idx in index], airlines)
    xt = plt.xticks()[0]
    plt.xticks(xt, [' '] * len(xt))
 
    # minimize chart junk
    plt.grid(axis = 'x', color ='white', linestyle='-')
 
    plt.title('Total Minutes Delayed per Airline')
    plt.show()
 
## Main functionality
def main(sc):
 
    # Load the airlines lookup dictionary
    airlines = dict(sc.textFile("ontime/airlines.csv").map(split).collect())
 
    # Broadcast the lookup dictionary to the cluster
    airline_lookup = sc.broadcast(airlines)
 
    # Read the CSV Data into an RDD
    flights = sc.textFile("ontime/flights.csv").map(split).map(parse)
 
    # Map the total delay to the airline (joined using the broadcast value)
    delays  = flights.map(lambda f: (airline_lookup.value[f.airline],
                                     add(f.dep_delay, f.arv_delay)))
 
    # Reduce the total delay for the month to the airline
    delays  = delays.reduceByKey(add).collect()
    delays  = sorted(delays, key=itemgetter(1))
 
    # Provide output from the driver
    for d in delays:
        print "%0.0f minutes delayed\t%s" % (d[1], d[0])
 
    # Show a bar chart of the delays
    plot(delays)
 
if __name__ == "__main__":
    # Configure Spark
    conf = SparkConf().setMaster("local[*]")
    conf = conf.setAppName(APP_NAME)
    sc   = SparkContext(conf=conf)
 
    # Execute Main functionality
    main(sc)

 

使用<spark-submit命令来运行这段代码(假设你已有ontime目录,目录中有两个CSV文件):

~$ spark-submit app.py

 

这个Spark做业使用本机做为master,并搜索app.py同目录下的ontime目录下的2个CSV文件。最终结果显示,4月的总延误时间(单位分钟),既有早点的(若是你从美国大陆飞往夏威夷或者阿拉斯加),但对大部分大型航空公司都是延误的。注意,咱们在app.py中使用matplotlib直接将结果可视化出来了:

这段代码作了什么呢?咱们特别注意下与Spark最直接相关的main函数。首先,咱们加载CSV文件到RDD,而后把split函数映射给它。split函数使用csv模块解析文本的每一行,并返回表明每行的元组。最后,咱们将collect动做传给RDD,这个动做把数据以Python列表的形式从RDD传回驱动程序。本例中,airlines.csv是个小型的跳转表(jump table),能够将航空公司代码与全名对应起来。咱们将转移表存储为Python字典,而后使用sc.broadcast广播给集群上的每一个节点。

接着,main函数加载了数据量更大的flights.csv([译者注]做者笔误写成fights.csv,此处更正)。拆分CSV行完成以后,咱们将parse函数映射给CSV行,此函数会把日期和时间转成Python的日期和时间,并对浮点数进行合适的类型转换。每行做为一个NamedTuple保存,名为Flight,以便高效简便地使用。

有了Flight对象的RDD,咱们映射一个匿名函数,这个函数将RDD转换为一些列的键值对,其中键是航空公司的名字,值是到达和出发的延误时间总和。使用reduceByKey动做和add操做符能够获得每一个航空公司的延误时间总和,而后RDD被传递给驱动程序(数据中航空公司的数目相对较少)。最终延误时间按照升序排列,输出打印到了控制台,而且使用matplotlib进行了可视化。

这个例子稍长,可是但愿能演示出集群和驱动程序之间的相互做用(发送数据进行分析,结果取回给驱动程序),以及Python代码在Spark应用中的角色。

结论

尽管算不上一个完整的Spark入门,咱们但愿你能更好地了解Spark是什么,如何使用进行快速、内存分布式计算。至少,你应该能将Spark运行起来,并开始在本机或Amazon EC2上探索数据。你应该能够配置好iPython notebook来运行Spark。

Spark不能解决分布式存储问题(一般Spark从HDFS中获取数据),可是它为分布式计算提供了丰富的函数式编程API。这个框架创建在伸缩分布式数据集(RDD)之上。RDD是种编程抽象,表明被分区的对象集合,容许进行分布式操做。RDD有容错能力(可伸缩的部分),更重要的时,能够存储到节点上的worker内存里进行当即重用。内存存储提供了快速和简单表示的迭代算法,以及实时交互分析。

因为Spark库提供了Python、Scale、Java编写的API,以及内建的机器学习、流数据、图算法、类SQL查询等模块;Spark迅速成为当今最重要的分布式计算框架之一。与YARN结合,Spark提供了增量,而不是替代已存在的Hadoop集群,它将成为将来大数据重要的一部分,为数据科学探索铺设了一条康庄大道。

有用的连接

但愿你喜欢这篇博文!写做并非凭空而来的,如下是一些曾帮助我写做的有用连接;查看这些连接,可能对进一步探索Spark有帮助。注意,有些图书连接是推广连接,意味着若是你点击并购买了这些图书,你将会支持District Data Labs!

这篇更可能是篇入门文章,而不是District Data Labs的典型文章,有些与此入门相关的数据和代码你能够在这里找到:

Spark论文

Spark与Hadoop同样,有一些基础论文,我认为那些须要对大数据集进行分布式计算的严谨数据科学家必定要读。首先是HotOS(“操做系统热门话题”的简写)的一篇研讨会论文,简单易懂地描述了Spark。第二个是偏理论的论文,具体描述了RDD。

  1. M. Zaharia, M. Chowdhury, M. J. Franklin, S. Shenker, and I. Stoica, “Spark: cluster computing with working sets,” in Proceedings of the 2nd USENIX conference on Hot topics in cloud computing, 2010, pp. 10–10.
  2. M. Zaharia, M. Chowdhury, T. Das, A. Dave, J. Ma, M. McCauley, M. J. Franklin, S. Shenker, and I. Stoica, “Resilient distributed datasets: A fault-tolerant abstraction for in-memory cluster computing,” in Proceedings of the 9th USENIX conference on Networked Systems Design and Implementation, 2012, pp. 2–2.

Spark图书

  1. 学习Spark
  2. 使用Spark进行高级分析

有用的博文

  1. 设置IPython以使用PySpark
  2. Databricks的Spark参考应用程序
  3. 在EC2上运行Spark
  4. 在Amazon Elastic MapReduce上运行Spark和SparkSQL
相关文章
相关标签/搜索