Spark快速入门

什么是Spark

Apache Spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架。最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一。html

与Hadoop和Storm等其余大数据和MapReduce技术相比,Spark有以下优点。java

首先,Spark为咱们提供了一个全面、统一的框架用于管理各类有着不一样性质(文本数据、图表数据等)的数据集和数据源(批量数据或实时的流数据)的大数据处理的需求。python

       

Spark能够将Hadoop集群中的应用在内存中的运行速度提高100倍,甚至可以将应用在磁盘上的运行速度提高10倍。web

Spark让开发者能够快速的用Java、Scala或Python编写程序。它自己自带了一个超过80个高阶操做符集合。并且还能够用它在shell中以交互式地查询数据。算法

除了Map和Reduce操做以外,它还支持SQL查询,流数据,机器学习和图表数据处理。开发者能够在一个数据管道用例中单独使用某一能力或者将这些能力结合在一块儿使用。sql

                                         

在这个Apache Spark文章系列的第一部分中,咱们将了解到什么是Spark,它与典型的MapReduce解决方案的比较以及它如何为大数据处理提供了一套完整的工具。shell

Hadoop和Spark

Hadoop这项大数据处理技术大概已有十年历史,并且被看作是首选的大数据集合处理的解决方案。MapReduce是一路计算的优秀解决方案,不 过对于须要多路计算和算法的用例来讲,并不是十分高效。数据处理流程中的每一步都须要一个Map阶段和一个Reduce阶段,并且若是要利用这一解决方案, 须要将全部用例都转换成MapReduce模式。数据库

在下一步开始以前,上一步的做业输出数据必需要存储到分布式文件系统中。所以,复制和磁盘存储会致使这种方式速度变慢。另外Hadoop解决方案中 一般会包含难以安装和管理的集群。并且为了处理不一样的大数据用例,还须要集成多种不一样的工具(如用于机器学习的Mahout和流数据处理的Storm)。express

若是想要完成比较复杂的工做,就必须将一系列的MapReduce做业串联起来而后顺序执行这些做业。每个做业都是高时延的,并且只有在前一个做业完成以后下一个做业才能开始启动。apache

而Spark则容许程序开发者使用有向无环图(DAG)开发复杂的多步数据管道。并且还支持跨有向无环图的内存数据共享,以便不一样的做业能够共同处理同一个数据。

Spark运行在现有的Hadoop分布式文件系统基础之上(HDFS)提供额外的加强功能。它支持将Spark应用部署到现存的Hadoop v1集群(with SIMR – Spark-Inside-MapReduce)或Hadoop v2 YARN集群甚至是Apache Mesos之中。

咱们应该将Spark看做是Hadoop MapReduce的一个替代品而不是Hadoop的替代品。其意图并不是是替代Hadoop,而是为了提供一个管理不一样的大数据用例和需求的全面且统一的解决方案。

Spark特性

Spark经过在数据处理过程当中成本更低的洗牌(Shuffle)方式,将MapReduce提高到一个更高的层次。利用内存数据存储和接近实时的处理能力,Spark比其余的大数据处理技术的性能要快不少倍。

Spark还支持大数据查询的延迟计算,这能够帮助优化大数据处理流程中的处理步骤。Spark还提供高级的API以提高开发者的生产力,除此以外还为大数据解决方案提供一致的体系架构模型。

Spark将中间结果保存在内存中而不是将其写入磁盘,当须要屡次处理同一数据集时,这一点特别实用。Spark的设计初衷就是既能够在内存中又可 以在磁盘上工做的执行引擎。当内存中的数据不适用时,Spark操做符就会执行外部操做。Spark能够用于处理大于集群内存容量总和的数据集。

Spark会尝试在内存中存储尽量多的数据而后将其写入磁盘。它能够将某个数据集的一部分存入内存而剩余部分存入磁盘。开发者须要根据数据和用例评估对内存的需求。Spark的性能优点得益于这种内存中的数据存储。

Spark的其余特性包括:

  • 支持比Map和Reduce更多的函数。

  • 优化任意操做算子图(operator graphs)。

  • 能够帮助优化总体数据处理流程的大数据查询的延迟计算。

  • 提供简明、一致的Scala,Java和Python API。

  • 提供交互式Scala和Python Shell。目前暂不支持Java。

Spark是用Scala程序设计语言编写而成,运行于Java虚拟机(JVM)环境之上。目前支持以下程序设计语言编写Spark应用:

  • Scala

  • Java

  • Python

  • Clojure

  • R

Spark生态系统

除了Spark核心API以外,Spark生态系统中还包括其余附加库,能够在大数据分析和机器学习领域提供更多的能力。

这些库包括:

  • Spark Streaming:

    • Spark Streaming基于微批量方式的计算和处理,能够用于处理实时的流数据。它使用DStream,简单来讲就是一个弹性分布式数据集(RDD)系列,处理实时数据。

  • Spark SQL:

    • Spark SQL能够经过JDBC API将Spark数据集暴露出去,并且还能够用传统的BI和可视化工具在Spark数据上执行相似SQL的查询。用户还能够用Spark SQL对不一样格式的数据(如JSON,Parquet以及数据库等)执行ETL,将其转化,而后暴露给特定的查询。

  • Spark MLlib:

    • MLlib是一个可扩展的Spark机器学习库,由通用的学习算法和工具组成,包括二元分类、线性回归、聚类、协同过滤、梯度降低以及底层优化原语。

  • Spark GraphX:

    • GraphX是用于图计算和并行图计算的 新的(alpha)Spark API。经过引入弹性分布式属性图(Resilient Distributed Property Graph),一种顶点和边都带有属性的有向多重图,扩展了Spark RDD。为了支持图计算,GraphX暴露了一个基础操做符集合(如subgraph,joinVertices和aggregateMessages) 和一个通过优化的Pregel API变体。此外,GraphX还包括一个持续增加的用于简化图分析任务的图算法和构建器集合。

除了这些库之外,还有一些其余的库,如BlinkDB和Tachyon。

BlinkDB是一个近似查询引擎,用于在海量数据上执行交互式SQL查询。BlinkDB能够经过牺牲数据精度来提高查询响应时间。经过在数据样本上执行查询并展现包含有意义的错误线注解的结果,操做大数据集合。

Tachyon是一个之内存为中心的 分布式文件系统,可以提供内存级别速度的跨集群框架(如Spark和MapReduce)的可信文件共享。它将工做集文件缓存在内存中,从而避免到磁盘中 加载须要常常读取的数据集。经过这一机制,不一样的做业/查询和框架能够之内存级的速度访问缓存的文件。
此外,还有一些用于与其余产品集成的适配器,如Cassandra(Spark Cassandra 链接器)和R(SparkR)。Cassandra Connector可用于访问存储在Cassandra数据库中的数据并在这些数据上执行数据分析。

下图展现了在Spark生态系统中,这些不一样的库之间的相互关联。

图1. Spark框架中的库

咱们将在这一系列文章中逐步探索这些Spark库

Spark体系架构

Spark体系架构包括以下三个主要组件:

  • 数据存储

  • API

  • 管理框架

接下来让咱们详细了解一下这些组件。

数据存储:

Spark用HDFS文件系统存储数据。它可用于存储任何兼容于Hadoop的数据源,包括HDFS,HBase,Cassandra等。

API

利用API,应用开发者能够用标准的API接口建立基于Spark的应用。Spark提供Scala,Java和Python三种程序设计语言的API。

下面是三种语言Spark API的网站连接。

资源管理:

Spark既能够部署在一个单独的服务器也能够部署在像Mesos或YARN这样的分布式计算框架之上。

下图2展现了Spark体系架构模型中的各个组件。

图2 Spark体系架构

弹性分布式数据集

弹性分布式数据集(基于Matei的研究论文)或RDD是Spark框架中的核心概念。能够将RDD视做数据库中的一张表。其中能够保存任何类型的数据。Spark将数据存储在不一样分区上的RDD之中。

RDD能够帮助从新安排计算并优化数据处理过程。

此外,它还具备容错性,由于RDD知道如何从新建立和从新计算数据集。

RDD是不可变的。你能够用变换(Transformation)修改RDD,可是这个变换所返回的是一个全新的RDD,而原有的RDD仍然保持不变。

RDD支持两种类型的操做:

  • 变换(Transformation)

  • 行动(Action)

变换:变换的返回值是一个新的RDD集合,而不是单个值。调用一个变换方法,不会有任何求值计算,它只获取一个RDD做为参数,而后返回一个新的RDD。

变换函数包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,pipe和coalesce。

行动:行动操做计算并返回一个新的值。当在一个RDD对象上调用行动函数时,会在这一时刻计算所有的数据处理查询并返回结果值。

行动操做包括:reduce,collect,count,first,take,countByKey以及foreach。

如何安装Spark

安装和使用Spark有几种不一样方式。你能够在本身的电脑上将Spark做为一个独立的框架安装或者从诸如Cloudera,HortonWorks或MapR之类的供应商处获取一个Spark虚拟机镜像直接使用。或者你也可使用在云端环境(如Databricks Cloud)安装并配置好的Spark。

在本文中,咱们将把Spark做为一个独立的框架安装并在本地启动它。最近Spark刚刚发布了1.2.0版本。咱们将用这一版本完成示例应用的代码展现。

如何运行Spark

当你在本地机器安装了Spark或使用了基于云端的Spark后,有几种不一样的方式能够链接到Spark引擎。

下表展现了不一样的Spark运行模式所需的Master URL参数。

如何与Spark交互

Spark启动并运行后,能够用Spark shell链接到Spark引擎进行交互式数据分析。Spark shell支持Scala和Python两种语言。Java不支持交互式的Shell,所以这一功能暂未在Java语言中实现。

能够用spark-shell.cmd和pyspark.cmd命令分别运行Scala版本和Python版本的Spark Shell。

Spark网页控制台

不论Spark运行在哪种模式下,均可以经过访问Spark网页控制台查看Spark的做业结果和其余的统计数据,控制台的URL地址以下:

http://localhost:4040

Spark控制台以下图3所示,包括Stages,Storage,Environment和Executors四个标签页

(点击查看大图)

图3. Spark网页控制台

共享变量

Spark提供两种类型的共享变量能够提高集群环境中的Spark程序运行效率。分别是广播变量和累加器。

广播变量:广播变量能够在每台机器上缓存只读变量而不须要为各个任务发送该变量的拷贝。他们可让大的输入数据集的集群拷贝中的节点更加高效。

下面的代码片断展现了如何使用广播变量。

//
// Broadcast Variables
//
val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar.value

累加器:只有在使用相关操做时才会添加累加器,所以它能够很好地支持并行。累加器可用于实现计数(就像在MapReduce中那样)或求和。能够用add方法将运行在集群上的任务添加到一个累加器变量中。不过这些任务没法读取变量的值。只有驱动程序才可以读取累加器的值。

下面的代码片断展现了如何使用累加器共享变量:

//
// Accumulators
//

val accum = sc.accumulator(0, "My Accumulator")

sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

accum.value

Spark应用示例

本篇文章中所涉及的示例应用是一个简单的字数统计应用。这与学习用Hadoop进行大数据处理时的示例应用相同。咱们将在一个文本文件上执行一些数 据分析查询。本示例中的文本文件和数据集都很小,不过无须修改任何代码,示例中所用到的Spark查询一样能够用到大容量数据集之上。

为了让讨论尽可能简单,咱们将使用Spark Scala Shell。

首先让咱们看一下如何在你本身的电脑上安装Spark。

前提条件:

  • 为了让Spark可以在本机正常工做,你须要安装Java开发工具包(JDK)。这将包含在下面的第一步中。

  • 一样还须要在电脑上安装Spark软件。下面的第二步将介绍如何完成这项工做。

注:下面这些指令都是以Windows环境为例。若是你使用不一样的操做系统环境,须要相应的修改系统变量和目录路径已匹配你的环境。

I. 安装JDK

1)从Oracle网站上下载JDK。推荐使用JDK 1.7版本

将JDK安装到一个没有空格的目录下。对于Windows用户,须要将JDK安装到像c:\dev这样的文件夹下,而不能安装到“c: \Program Files”文件夹下。“c:\Program Files”文件夹的名字中包含空格,若是软件安装到这个文件夹下会致使一些问题。

注:不要在“c:\Program Files”文件夹中安装JDK或(第二步中所描述的)Spark软件。

2)完成JDK安装后,切换至JDK 1.7目录下的”bin“文件夹,而后键入以下命令,验证JDK是否正确安装:

java -version

若是JDK安装正确,上述命令将显示Java版本。

II. 安装Spark软件:

Spark网站上下载最新版本 的Spark。在本文发表时,最新的Spark版本是1.2。你能够根据Hadoop的版本选择一个特定的Spark版本安装。我下载了与Hadoop 2.4或更高版本匹配的Spark,文件名是spark-1.2.0-bin-hadoop2.4.tgz。

将安装文件解压到本地文件夹中(如:c:\dev)。

为了验证Spark安装的正确性,切换至Spark文件夹而后用以下命令启动Spark Shell。这是Windows环境下的命令。若是使用Linux或Mac OS,请相应地编辑命令以便可以在相应的平台上正确运行。

c:
cd c:\dev\spark-1.2.0-bin-hadoop2.4
bin\spark-shell

若是Spark安装正确,就可以在控制台的输出中看到以下信息。

….
15/01/17 23:17:46 INFO HttpServer: Starting HTTP Server
15/01/17 23:17:46 INFO Utils: Successfully started service 'HTTP class server' on port 58132.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.2.0
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Type :help for more information.
….
15/01/17 23:17:53 INFO BlockManagerMaster: Registered BlockManager
15/01/17 23:17:53 INFO SparkILoop: Created spark context..
Spark context available as sc.

能够键入以下命令检查Spark Shell是否工做正常。

sc.version

(或)

sc.appName

完成上述步骤以后,能够键入以下命令退出Spark Shell窗口:

:quit

若是想启动Spark Python Shell,须要先在电脑上安装Python。你能够下载并安装Anaconda,这是一个免费的Python发行版本,其中包括了一些比较流行的科学、数学、工程和数据分析方面的Python包。

而后能够运行以下命令启动Spark Python Shell:

c:
cd c:\dev\spark-1.2.0-bin-hadoop2.4
bin\pyspark

Spark示例应用

完成Spark安装并启动后,就能够用Spark API执行数据分析查询了。

这些从文本文件中读取并处理数据的命令都很简单。咱们将在这一系列文章的后续文章中向你们介绍更高级的Spark框架使用的用例。

首先让咱们用Spark API运行流行的Word Count示例。若是尚未运行Spark Scala Shell,首先打开一个Scala Shell窗口。这个示例的相关命令以下所示:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
 
val txtFile = "README.md"
val txtData = sc.textFile(txtFile)
txtData.cache()

咱们能够调用cache函数将上一步生成的RDD对象保存到缓存中,在此以后Spark就不须要在每次数据查询时都从新计算。须要注意的 是,cache()是一个延迟操做。在咱们调用cache时,Spark并不会立刻将数据存储到内存中。只有当在某个RDD上调用一个行动时,才会真正执 行这个操做。

如今,咱们能够调用count函数,看一下在文本文件中有多少行数据。

txtData.count()

而后,咱们能够执行以下命令进行字数统计。在文本文件中统计数据会显示在每一个单词的后面。

val wcData = txtData.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)

wcData.collect().foreach(println)

若是想查看更多关于如何使用Spark核心API的代码示例,请参考网站上的Spark文档

后续计划

在后续的系列文章中,咱们将从Spark SQL开始,学习更多关于Spark生态系统的其余部分。以后,咱们将继续了解Spark Streaming,Spark MLlib和Spark GraphX。咱们也会有机会学习像Tachyon和BlinkDB等框架。

小结

在本文中,咱们了解了Apache Spark框架如何经过其标准API帮助完成大数据处理和分析工做。咱们还对Spark和传统的MapReduce实现(如Apache Hadoop)进行了比较。Spark与Hadoop基于相同的HDFS文件存储系统,所以若是你已经在Hadoop上进行了大量投资和基础设施建设,可 以一块儿使用Spark和MapReduce。

此外,也能够将Spark处理与Spark SQL、机器学习以及Spark Streaming结合在一块儿。关于这方面的内容咱们将在后续的文章中介绍。

利用Spark的一些集成功能和适配器,咱们能够将其余技术与Spark结合在一块儿。其中一个案例就是将Spark、Kafka和Apache Cassandra结合在一块儿,其中Kafka负责输入的流式数据,Spark完成计算,最后Cassandra NoSQL数据库用于保存计算结果数据。

不过须要牢记的是,Spark生态系统仍不成熟,在安全和与BI工具集成等领域仍然须要进一步的改进。

参考文献

关于做者

Srini Penchikala目 前是一家金融服务机构的软件架构师,这个机构位于德克萨斯州的奥斯汀。他在软件系统架构、设计和开发方面有超过20年的经验。Srini目前正在撰写一本 关于NoSQL数据库模式的书。他仍是曼宁出版社出版的《Spring Roo in Action》一书的合著者(http://www.manning.com/SpringRooinAction)。他还曾经出席各类会议,如 JavaOne,SEI Architecture Technology Conference(SATURN),IT Architect Conference(ITARC),No Fluff Just Stuff,NoSQL Now和Project World Conference等。Srini还在InfoQ,The ServerSide,OReilly Network(ONJava),DevX Java,java.net以及JavaWorld等网站上发表过不少关于软件系统架构、安全和风险管理以及NoSQL数据库等方面的文章。他仍是InfoQ NoSQL数据库社区的责任编辑

 

查看英文原文:Big Data Processing with Apache Spark – Part 1: Introduction

相关文章
相关标签/搜索