本文中,咱们将首先讨论如何在 本地机器上利用Spark进行简单分析。而后,将在入门级水平探索Spark,了解Spark是什么以及它如何工做(但愿能够激发更多探索)。最后两节将 开始经过命令行与Spark进行交互,而后演示如何用Python写Spark应用,并做为Spark做业提交到集群上。同时也会提供相应的 Scala 版本。html
在本机设置和运行Spark很是简单。你只须要下载一个预构建的包,只要你安装了 Java 6+和Python 2.6+,就能够在Windows、Mac OS X和Linux上运行Spark。确保java程序在PATH环境变量中,或者设置了JAVA_HOME环境变量。相似的,python也要在PATH 中。java
假设你已经安装了Java和Python,以及 Spark,若是没有请参照以前的教程:python
《Spark 伪分布式 & 全分布式 安装指南》:http://my.oschina.net/leejun2005/blog/394928git
注意:若是要用到下文的 pyspark,则须要设置 python 相关的 spark 包路径:github
vi .bashrc export PYTHONPATH=$SPARK_HOME/python/:$SPARK_HOME/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH
不然会报错: ImportError: No module named pyspark 或者 ImportError: No module named py4j.java_gateway算法
source这些配置(或者重启终端)以后,你就能够在本地运行一个pyspark解释器。执行pyspark命令,你会看到如下结果:数据库
~$ pyspark Python 2.7.8 (default, Dec 2 2014, 12:45:58) [GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.54)] on darwin Type "help", "copyright", "credits" or "license" for more information. Spark assembly has been built with Hive, including Datanucleus jars on classpath Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties [… snip …] Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ `_/ /__ / .__/\_,_/_/ /_/\_\ version 1.2.0 /_/ Using Python version 2.7.8 (default, Dec 2 2014 12:45:58) SparkContext available as sc. >>>
如今Spark已经安装完毕,能够在本机以”单机模式“使用。你能够在本机开发应用 并提交Spark做业,这些做业将以多进程/多线程模式运行的,或者,配置该机器做为一个集群的客户端(不推荐这样作,由于在Spark做业中,驱动程序 (driver)是个很重要的角色,而且应该与集群的其余部分处于相同网络)。apache
Spark(和PySpark)的执行能够特别详细,不少INFO日志消息都会打印 到屏幕。开发过程当中,这些很是恼人,由于可能丢失Python栈跟踪或者print的输出。为了减小Spark输出 – 你能够设置$SPARK_HOME/conf下的log4j。首先,拷贝一份$SPARK_HOME/conf /log4j.properties.template文件,去掉“.template”扩展名。编程
编辑新文件,用WARN替换代码中出现的INFO。你的log4j.properties文件相似:缓存
~$ pyspark Python 2.7.8 (default, Dec 2 2014, 12:45:58) [GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.54)] on darwin Type "help", "copyright", "credits" or "license" for more information. Spark assembly has been built with Hive, including Datanucleus jars on classpath Using Sparks default log4j profile: org/apache/spark/log4j-defaults.properties [… snip …] Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ `_/ /__ / .__/\_,_/_/ /_/\_\ version 1.2.0 /_/ Using Python version 2.7.8 (default, Dec 2 2014 12:45:58) SparkContext available as sc. >>>
如今运行PySpark,输出消息将会更简略!
talk is cheap,show you the code. 我们先来测试下 Spark 环境是否正常:
from pyspark import SparkContext from pyspark.streaming import StreamingContext sc = SparkContext("spark://110.9.17.187:8070", "NetworkWordCount") data = [1, 2, 3, 4, 5] distData = sc.parallelize(data) distData.reduce(lambda a, b: a + b)
若是你能获得一个数字 15,并且没有错误发生,那么你的context正确工做了!
既然设置好了Spark,如今咱们讨论下Spark是什么。Spark是个通用的集 群计算框架,经过将大量数据集计算任务分配到多台计算机上,提供高效内存计算。若是你熟悉Hadoop,那么你知道分布式计算框架要解决两个问题:如何分 发数据和如何分发计算。Hadoop使用HDFS来解决分布式数据问题,MapReduce计算范式提供有效的分布式计算。相似的,Spark拥有多种语 言的函数式编程API,提供了除map和reduce以外更多的运算符,这些操做是经过一个称做弹性分布式数据集(resilient distributed datasets, RDDs)的分布式数据框架进行的。
本质上,RDD是种编程抽象,表明能够跨机器进行分割的只读对象集合。RDD能够从 一个继承结构(lineage)重建(所以能够容错),经过并行操做访问,能够读写HDFS或S3这样的分布式存储,更重要的是,能够缓存到worker 节点的内存中进行当即重用。因为RDD能够被缓存在内存中,Spark对迭代应用特别有效,由于这些应用中,数据是在整个算法运算过程当中均可以被重用。大 多数机器学习和最优化算法都是迭代的,使得Spark对数据科学来讲是个很是有效的工具。另外,因为Spark很是快,能够经过相似Python REPL的命令行提示符交互式访问。
Spark库自己包含不少应用元素,这些元素能够用到大部分大数据应用中,其中包括对大数据进行相似SQL查询的支持,机器学习和图算法,甚至对实时流数据的支持。
核心组件以下:
Spark Core:包含Spark的基本功能;尤为是定义RDD的API、操做以及这二者上的动做。其余Spark的库都是构建在RDD和Spark Core之上的。
Spark SQL:提供经过Apache Hive的SQL变体Hive查询语言(HiveQL)与Spark进行交互的API。每一个数据库表被当作一个RDD,Spark SQL查询被转换为Spark操做。对熟悉Hive和HiveQL的人,Spark能够拿来就用。
Spark Streaming:容许对实时数据流进行处理和控制。不少实时数据库(如Apache Store)能够处理实时数据。Spark Streaming容许程序可以像普通RDD同样处理实时数据。
MLlib:一个经常使用机器学习算法库,算法被实现为对RDD的Spark操做。这个库包含可扩展的学习算法,好比分类、回归等须要对大量数据集进行迭代的操做。以前可选的大数据机器学习库Mahout,将会转到Spark,并在将来实现。
GraphX:控制图、并行图操做和计算的一组算法和工具的集合。GraphX扩展了RDD API,包含控制图、建立子图、访问路径上全部顶点的操做。
因为这些组件知足了不少大数据需求,也知足了不少数据科学任务的算法和计算上的需 要,Spark快速流行起来。不只如此,Spark也提供了使用Scala、Java和Python编写的API;知足了不一样团体的需求,容许更多数据科 学家简便地采用Spark做为他们的大数据解决方案。
编写Spark应用与以前实如今Hadoop上的其余数据流语言相似。代码写入一个 惰性求值的驱动程序(driver program)中,经过一个动做(action),驱动代码被分发到集群上,由各个RDD分区上的worker来执行。而后结果会被发送回驱动程序进行 聚合或编译。本质上,驱动程序建立一个或多个RDD,调用操做来转换RDD,而后调用动做处理被转换后的RDD。
这些步骤大致以下:
(1)定义一个或多个RDD,能够经过获取存储在磁盘上的数据(HDFS,Cassandra,HBase,Local Disk),并行化内存中的某些集合,转换(transform)一个已存在的RDD,或者,缓存或保存。
(2)经过传递一个闭包(函数)给RDD上的每一个元素来调用RDD上的操做。Spark提供了除了Map和Reduce的80多种高级操做。
(3)使用结果RDD的动做(action)(如count、collect、save等)。动做将会启动集群上的计算。
当Spark在一个worker上运行闭包时,闭包中用到的全部变量都会被拷贝到节 点上,可是由闭包的局部做用域来维护。Spark提供了两种类型的共享变量,这些变量能够按照限定的方式被全部worker访问。广播变量会被分发给全部 worker,可是是只读的。累加器这种变量,worker可使用关联操做来“加”,一般用做计数器。
Spark应用本质上经过转换和动做来控制RDD。后续文章将会深刻讨论,可是理解了这个就足以执行下面的例子了。
简略描述下Spark的执行。本质上,Spark应用做为独立的进程运行,由驱动程 序中的SparkContext协调。这个context将会链接到一些集群管理者(如YARN),这些管理者分配系统资源。集群上的每一个worker由 执行者(executor)管理,执行者反过来由SparkContext管理。执行者管理计算、存储,还有每台机器上的缓存。
重点要记住的是应用代码由驱动程序发送给执行者,执行者指定context和要运行 的任务。执行者与驱动程序通讯进行数据分享或者交互。驱动程序是Spark做业的主要参与者,所以须要与集群处于相同的网络。这与Hadoop代码不 同,Hadoop中你能够在任意位置提交做业给JobTracker,JobTracker处理集群上的执行。
使用Spark最简单的方式就是使用交互式命令行提示符。打开PySpark终端,在命令行中打出pyspark。
PySpark将会自动使用本地Spark配置建立一个SparkContext。你能够经过sc变量来访问它。咱们来建立第一个RDD。
>>> text = sc.textFile("shakespeare.txt") >>> print text shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2
textFile方法将莎士比亚所有做品加载到一个RDD命名文本。若是查看了 RDD,你就能够看出它是个MappedRDD,文件路径是相对于当前工做目录的一个相对路径(记得传递磁盘上正确的shakespear.txt文件路 径)。咱们转换下这个RDD,来进行分布式计算的“hello world”:“字数统计”。
>>> from operator import add >>> def tokenize(text): ... return text.split() ... >>> words = text.flatMap(tokenize) >>> print words PythonRDD[2] at RDD at PythonRDD.scala:43
咱们首先导入了add操做符,它是个命名函数,能够做为加法的闭包来使用。咱们稍后 再使用这个函数。首先咱们要作的是把文本拆分为单词。咱们建立了一个tokenize函数,参数是文本片断,返回根据空格拆分的单词列表。而后咱们经过给 flatMap操做符传递tokenize闭包对textRDD进行变换建立了一个wordsRDD。你会发现,words是个PythonRDD,可是 执行本应该当即进行。显然,咱们尚未把整个莎士比亚数据集拆分为单词列表。
若是你曾使用MapReduce作过Hadoop版的“字数统计”,你应该知道下一步是将每一个单词映射到一个键值对,其中键是单词,值是1,而后使用reducer计算每一个键的1总数。
首先,咱们map一下。
>>> wc = words.map(lambda x: (x,1)) >>> print wc.toDebugString() (2) PythonRDD[3] at RDD at PythonRDD.scala:43 | shakespeare.txt MappedRDD[1] at textFile at NativeMethodAccessorImpl.java:-2 | shakespeare.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:-2
我使用了一个匿名函数(用了Python中的lambda关键字)而不是命名函数。 这行代码将会把lambda映射到每一个单词。所以,每一个x都是一个单词,每一个单词都会被匿名闭包转换为元组(word, 1)。为了查看转换关系,咱们使用toDebugString方法来查看PipelinedRDD是怎么被转换的。可使用reduceByKey动做进 行字数统计,而后把统计结果写到磁盘。
>>> counts = wc.reduceByKey(add) >>> counts.saveAsTextFile("wc")
一旦咱们最终调用了saveAsTextFile动做,这个分布式做业就开始执行了,在做业“跨集群地”(或者你本机的不少进程)运行时,你应该能够看到不少INFO语句。若是退出解释器,你能够看到当前工做目录下有个“wc”目录。
$ ls wc /_SUCCESS part-00000 part-00001
每一个part文件都表明你本机上的进程计算获得的被保持到磁盘上的最终RDD。若是对一个part文件进行head命令,你应该能看到字数统计元组。
$ head wc/part-00000 (u'fawn', 14) (u'Fame.', 1) (u'Fame,', 2) (u'kinghenryviii@7731', 1) (u'othello@36737', 1) (u'loveslabourslost@51678', 1) (u'1kinghenryiv@54228', 1) (u'troilusandcressida@83747', 1) (u'fleeces', 1) (u'midsummersnightsdream@71681', 1)
注意这些键没有像Hadoop同样被排序(由于Hadoop中Map和Reduce 任务中有个必要的打乱和排序阶段)。可是,能保证每一个单词在全部文件中只出现一次,由于你使用了reduceByKey操做符。你还可使用sort操做 符确保在写入到磁盘以前全部的键都被排过序。
一个完整的例子:
from pyspark import SparkContext sc = SparkContext("spark://110.9.17.187:8070", "NetworkWordCount") lines = sc.textFile("hdfs://110.9.17.187:8020/tmp/num.txt") lineLengths = lines.map(lambda s: len(s)) totalLength = lineLengths.reduce(lambda a, b: a + b) print totalLength # # lines.count()
scala 版本以下:
val lines = sc.textFile("hdfs://110.9.17.187:8020/tmp/num.txt") val lineLengths = lines.map(s => s.length) val totalLength = lineLengths.reduce((a, b) => a + b)
PS:我这边用上面的两段代码测试发现(一亿五千万随机数,500MB),scala 比 python 快了 20 倍,跟官方的性能数据相差太远了,
应该是pythonAPI或者环境哪里有问题~
编写Spark应用与经过交互式控制台使用Spark相似。API是相同的。首先,你须要访问<SparkContext,它已经由<pyspark自动加载好了。
使用Spark编写Spark应用的一个基本模板以下:
## Spark Application - execute with spark-submit: spark-submit app.py ## Imports from pyspark import SparkConf, SparkContext ## Module Constants APP_NAME = "My Spark Application" ## Closure Functions ## Main functionality def main(sc): pass if __name__ == "__main__": # Configure Spark conf = SparkConf().setAppName(APP_NAME) conf = conf.setMaster("local[*]") sc = SparkContext(conf=conf) # Execute Main functionality main(sc)
这个模板列出了一个Spark应用所需的东西:导入Python库,模块常量,用于 调试和Spark UI的可识别的应用名称,还有做为驱动程序运行的一些主要分析方法学。在ifmain中,咱们建立了SparkContext,使用了配置好的 context执行main。咱们能够简单地导入驱动代码到pyspark而不用执行。注意这里Spark配置经过setMaster方法被硬编码到 SparkConf,通常你应该容许这个值经过命令行来设置,因此你能看到这行作了占位符注释。
使用<sc.stop()或<sys.exit(0)来关闭或退出程序。
## Spark Application - execute with spark-submit ## Imports import csv import matplotlib.pyplot as plt from StringIO import StringIO from datetime import datetime from collections import namedtuple from operator import add, itemgetter from pyspark import SparkConf, SparkContext ## Module Constants APP_NAME = "Flight Delay Analysis" DATE_FMT = "%Y-%m-%d" TIME_FMT = "%H%M" fields = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep', 'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance') Flight = namedtuple('Flight', fields) ## Closure Functions def parse(row): """ Parses a row and returns a named tuple. """ row[0] = datetime.strptime(row[0], DATE_FMT).date() row[5] = datetime.strptime(row[5], TIME_FMT).time() row[6] = float(row[6]) row[7] = datetime.strptime(row[7], TIME_FMT).time() row[8] = float(row[8]) row[9] = float(row[9]) row[10] = float(row[10]) return Flight(*row[:11]) def split(line): """ Operator function for splitting a line with csv module """ reader = csv.reader(StringIO(line)) return reader.next() def plot(delays): """ Show a bar chart of the total delay per airline """ airlines = [d[0] for d in delays] minutes = [d[1] for d in delays] index = list(xrange(len(airlines))) fig, axe = plt.subplots() bars = axe.barh(index, minutes) # Add the total minutes to the right for idx, air, min in zip(index, airlines, minutes): if min > 0: bars[idx].set_color('#d9230f') axe.annotate(" %0.0f min" % min, xy=(min+1, idx+0.5), va='center') else: bars[idx].set_color('#469408') axe.annotate(" %0.0f min" % min, xy=(10, idx+0.5), va='center') # Set the ticks ticks = plt.yticks([idx+ 0.5 for idx in index], airlines) xt = plt.xticks()[0] plt.xticks(xt, [' '] * len(xt)) # minimize chart junk plt.grid(axis = 'x', color ='white', linestyle='-') plt.title('Total Minutes Delayed per Airline') plt.show() ## Main functionality def main(sc): # Load the airlines lookup dictionary airlines = dict(sc.textFile("ontime/airlines.csv").map(split).collect()) # Broadcast the lookup dictionary to the cluster airline_lookup = sc.broadcast(airlines) # Read the CSV Data into an RDD flights = sc.textFile("ontime/flights.csv").map(split).map(parse) # Map the total delay to the airline (joined using the broadcast value) delays = flights.map(lambda f: (airline_lookup.value[f.airline], add(f.dep_delay, f.arv_delay))) # Reduce the total delay for the month to the airline delays = delays.reduceByKey(add).collect() delays = sorted(delays, key=itemgetter(1)) # Provide output from the driver for d in delays: print "%0.0f minutes delayed\t%s" % (d[1], d[0]) # Show a bar chart of the delays plot(delays) if __name__ == "__main__": # Configure Spark conf = SparkConf().setMaster("local[*]") conf = conf.setAppName(APP_NAME) sc = SparkContext(conf=conf) # Execute Main functionality main(sc)
使用<spark-submit命令来运行这段代码(假设你已有ontime目录,目录中有两个CSV文件):
~$ spark-submit app.py
这个Spark做业使用本机做为master,并搜索app.py同目录下的 ontime目录下的2个CSV文件。最终结果显示,4月的总延误时间(单位分钟),既有早点的(若是你从美国大陆飞往夏威夷或者阿拉斯加),但对大部分 大型航空公司都是延误的。注意,咱们在app.py中使用matplotlib直接将结果可视化出来了:
这段代码作了什么呢?咱们特别注意下与Spark最直接相关的main函数。首先, 咱们加载CSV文件到RDD,而后把split函数映射给它。split函数使用csv模块解析文本的每一行,并返回表明每行的元组。最后,咱们将 collect动做传给RDD,这个动做把数据以Python列表的形式从RDD传回驱动程序。本例中,airlines.csv是个小型的跳转表 (jump table),能够将航空公司代码与全名对应起来。咱们将转移表存储为Python字典,而后使用sc.broadcast广播给集群上的每一个节点。
接着,main函数加载了数据量更大的flights.csv([译者注]做者笔误 写成fights.csv,此处更正)。拆分CSV行完成以后,咱们将parse函数映射给CSV行,此函数会把日期和时间转成Python的日期和时 间,并对浮点数进行合适的类型转换。每行做为一个NamedTuple保存,名为Flight,以便高效简便地使用。
有了Flight对象的RDD,咱们映射一个匿名函数,这个函数将RDD转换为一些 列的键值对,其中键是航空公司的名字,值是到达和出发的延误时间总和。使用reduceByKey动做和add操做符能够获得每一个航空公司的延误时间总 和,而后RDD被传递给驱动程序(数据中航空公司的数目相对较少)。最终延误时间按照升序排列,输出打印到了控制台,而且使用matplotlib进行了 可视化。
这个例子稍长,可是但愿能演示出集群和驱动程序之间的相互做用(发送数据进行分析,结果取回给驱动程序),以及Python代码在Spark应用中的角色。
Spark Streaming 主要用来作实时处理,其原理本质上是“更细粒度的批处理”:
from pyspark import SparkContext from pyspark.streaming import StreamingContext # Create a local StreamingContext with two working thread and batch interval of 1 second sc = SparkContext("spark://110.9.17.187:8070", "NetworkWordCount") ssc = StreamingContext(sc, 3) # Create a DStream that will connect to hostname:port, like localhost:9999 lines = ssc.socketTextStream("110.9.17.187", 9999) # Split each line into words words = lines.flatMap(lambda line: line.split(" ")) # Count each word in each batch pairs = words.map(lambda word: (word, 1)) wordCounts = pairs.reduceByKey(lambda x, y: x + y) # Print the first ten elements of each RDD generated in this DStream to the console wordCounts.pprint() ssc.start() # Start the computation ssc.awaitTermination() # Wait for the computation to terminate
Scala 版本:
import org.apache.spark._ import org.apache.spark.streaming._ // not necessary in Spark 1.3+ // Create a local StreamingContext with two working thread and batch interval of 1 second. // The master requires 2 cores to prevent from a starvation scenario. object Streaming { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(3)) // Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("localhost", 9999) // Split each line into words val words = lines.flatMap(_.split(" ")) // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate } }
测试:
# TERMINAL 1: # Running Netcat $ nc -lk 9999 hello world ...
# TERMINAL 2: RUNNING NetworkWordCount spark-submit TestScala.jar localhost 9999 ... ------------------------------------------- Time: 1357008430000 ms ------------------------------------------- (hello,1) (world,1) ...
尽管算不上一个完整的Spark入门,咱们但愿你能更好地了解Spark是什么,如何使用进行快速、内存分布式计算。至少,你应该能将Spark运行起来,并开始在本机或Amazon EC2上探索数据。
Spark不能解决分布式存储问题(一般Spark从HDFS中获取数据),可是它 为分布式计算提供了丰富的函数式编程API。这个框架创建在伸缩分布式数据集(RDD)之上。RDD是种编程抽象,表明被分区的对象集合,容许进行分布式 操做。RDD有容错能力(可伸缩的部分),更重要的时,能够存储到节点上的worker内存里进行当即重用。内存存储提供了快速和简单表示的迭代算法,以 及实时交互分析。
因为Spark库提供了Python、Scale、Java编写的API,以及内建 的机器学习、流数据、图算法、类SQL查询等模块;Spark迅速成为当今最重要的分布式计算框架之一。与YARN结合,Spark提供了增量,而不是替 代已存在的Hadoop集群,它将成为将来大数据重要的一部分,为数据科学探索铺设了一条康庄大道。
[1] Spark入门(Python版)
http://blog.jobbole.com/86232/
[2] Spark编程指南笔记
http://blog.javachen.com/2015/02/03/spark-programming-guide/#
[3] Spark Streaming Programming Guide
https://spark.apache.org/docs/latest/streaming-programming-guide.html
[4] 大数据算命系列(8): spark框架与pyspark简介
https://github.com/renewjoy/bigdata-fortune-telling/blob/master/08_pyspark/pyspark.rst
[5] PySpark内部实现
http://blog.csdn.net/lantian0802/article/details/36376873
[6] Spark Streaming
http://debugo.com/spark-streaming/
[7] Spark新年福音:一个用于大规模数据科学的API——DataFrame
http://www.csdn.net/article/2015-02-17/2823997
[8] Python vs. Scala vs. Spark
http://emptypipes.org/2015/01/17/python-vs-scala-vs-spark/
[9] Spark1.0.0 应用程序部署工具spark-submit
http://blog.csdn.net/book_mmicky/article/details/25714545
【Spark1.3官方翻译】 Spark Submit提交应用程序,spark1.3spark
http://www.bkjia.com/yjs/980456.html
[10] 使用IntelliJ IDEA开发Spark1.0.0应用程序
http://blog.csdn.net/book_mmicky/article/details/25714549
Scala从零开始:使用Intellij IDEA写hello world
http://blog.csdn.net/asongoficeandfire/article/details/26412493
Scala从零开始:使用Scala IDE写hello world
http://blog.csdn.net/asongoficeandfire/article/details/21490101
运行第一个SparkStreaming程序(及过程当中问题解决)