1、初识Spark和Hadoophtml
Apache Spark 是一个新兴的大数据处理通用引擎,提供了分布式的内存抽象。Spark 正如其名,最大的特色就是快(Lightning-fast),可比 Hadoop MapReduce 的处理速度快 100 倍。web
Hadoop实质上更可能是一个分布式数据基础设施: 它将巨大的数据集分派到一个由普通计算机组成的集群中的多个节点进行存储,意味着你不须要购买和维护昂贵的服务器硬件。shell
同时,Hadoop还会索引和跟踪这些数据,让大数据处理和分析效率达到史无前例的高度。Spark则是一个专门用来对那些分布式存储的大数据进行处理的工具,它并不会进行分布式数据的存储。apache
Hadoop除了提供为你们所共识的HDFS分布式数据存储功能以外,还提供了叫作MapReduce的数据处理功能。因此咱们彻底能够抛开Spark,使用Hadoop自身的MapReduce来完成数据的处理。vim
固然,Spark也不是非要依附在Hadoop身上才能生存。但它没有提供文件管理系统,因此,它必须和其余的分布式文件系统进行集成才能运做。咱们能够选择Hadoop的HDFS,也能够选择其余的基于云的数据系统平台。但Spark默认来讲仍是被用在Hadoop上面的,毕竟你们都认为它们的结合是最好的。缓存
2、安装Spark
bash
如下都是在Ubuntu 16.04 系统上的操做
服务器
1.安装Java JDK并配置好环境变量(这部分就不详细说了)闭包
2.安装Hadoop
app
2.1 建立Hadoop用户:
打开终端,输入命令:
sudo useradd -m hadoop -s /bin/bash
添加hadoop用户,并设置/bin/bash做为shell
2.2 设置Hadoop用户的登陆密码:
sudo passwd hadoop
而后根据系统提示输入两次本身的密码,再给hadoop用户添加管理员权限:
sudo adduser hadoop sudo
2.3 将当前的用户切换到刚刚建立的hadoop用户(屏幕右上角有个齿轮,点进去就看到)
2.4 更新系统的apt。打开终端,输入命令:
sudo apt-get update
2.5 安装ssh、配置ssh无密码登陆
集群、单节点模式都须要遇到SSH登陆。Ubuntu默认安装了SSH Client,但须要本身安装SSH Server:
sudo apt-get install openssh-server
安装后,直接登陆本机
ssh localhost
SSH首次登陆须要确认,根据提示输入:yes,而后再按提示输入刚刚设置的hadoop的密码,就登陆了。
2.6 下载Hadoop
下载地址:http://mirror.bit.edu.cn/apache/hadoop/common/
选择“stable”文件夹,点击下载“hadoop-2.x.y.tar.gz”文件。默认会下载到“下载”目录中,
在该文件夹下打开终端,将该文件解压到/usr/local文件中,执行命令:
sudo tar -zxf ~/hadoop-2.9.0.tar.gz cd /usr/local/ sudo mv ./hadoop-2.9.0/ ./hadoop #将文件名修改成hadoop sudo chown -R hadoop ./hadoop #修改文件权限
Hadoop的文件夹解压以后就能够直接使用,检查一下Hadoop是否能够正常使用,若是正常则显示Hadoop的版本信息
cd /usr/local/hadoop ./bin/hadoop version
这里就初步完成了Hadoop的安装,还有不少配置什么的用到的时候在写,好比伪分布式系统配置。
3.安装Spark
3.1 下载Spark:http://spark.apache.org/downloads.html
第一项我选择的版本是最新当前的最新版本:2.3.1,第二项选择“Pre-build with user-provided Apache Hadoop”,而后点击第三项后面的下载“spark-2.3.1-bin-without-hadoop-tgz”。
3.2 解压文件
这一步与Hadoop的解压是同样的,咱们都把它解压到/usr/local路径下:
$ sudo tar -zxf ~/下载/spark-2.3.1-bin-without-hadoop.tgz -C /usr/local/ $ cd /usr/local $ sudo mv ./spark-2.3.1-bin-without-hadoop/ ./spark $ sudo chown -R hadoop:hadoop ./spark
3.3 设置环境变量
执行以下命令拷贝一个配置文件:
$ cd /usr/local/spark $ ./conf/spark-env.sh.template ./conf/spark-env.sh
而后编辑spark-env.sh:
$ vim ./conf/spark-env.sh
打开以后在这个文件的最后一行加上下面的内容:
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)
而后保存退出Vim,就可使用Spark了。
3、Spark入门示例
在文件路径“/usr/local/spark/examples/src/main”中,咱们能够找到spark自带的一些实例,以下图能够看到Spark支持Scala、Python、Java、R语言等。
1.Spark最简单的方式就是使用交互式命令行提示符。打开PySpark终端,在命令行中打出pyspark:
~$ pyspark
2.PySpark将会自动使用本地Spark配置建立一个SparkContext。咱们能够经过sc变量来访问它,来建立第一个RDD:
>>>text=sc.textFile(“file\\\usr\local\spark\exp\test1.txt") >>>print text
3.转换一下这个RDD,来进行分布式计算的“hello world”:“字数统计”
首先导入了add操做符,它是个命名函数,能够做为加法的闭包来使用。咱们稍后再使用这个函数。首先咱们要作的是把文本拆分为单词。咱们建立了一个tokenize函数,参数是文本片断,返回根据空格拆分的单词列表。而后咱们经过给flatMap操做符传递tokenize闭包对textRDD进行变换建立了一个wordsRDD。你会发现,words是个PythonRDD,可是执行本应该当即进行。显然,咱们尚未把整个数据集拆分为单词列表。
4.将每一个单词映射到一个键值对,其中键是单词,值是1,而后使用reducer计算每一个键的1总数
>>> wc = words.map(lambda x: (x,1)) >>> print wc.toDebugString()
我使用了一个匿名函数(用了Python中的lambda关键字)而不是命名函数。这行代码将会把lambda映射到每一个单词。所以,每一个x都是一个单词,每一个单词都会被匿名闭包转换为元组(word, 1)。为了查看转换关系,咱们使用toDebugString方法来查看PipelinedRDD是怎么被转换的。可使用reduceByKey动做进行字数统计,而后把统计结果写到磁盘。
5.使用reduceByKey动做进行字数统计,而后把统计结果写到磁盘
>>> counts = wc.reduceByKey(add) >>> counts.saveAsTextFile("wc")
一旦咱们最终调用了saveAsTextFile动做,这个分布式做业就开始执行了,在做业“跨集群地”(或者你本机的不少进程)运行时,你应该能够看到不少INFO语句。若是退出解释器,你能够看到当前工做目录下有个“wc”目录。每一个part文件都表明你本机上的进程计算获得的被保持到磁盘上的最终RDD。
4、Spark数据形式
4.1 弹性分布式数据集(RDD)
Spark 的主要抽象是分布式的元素集合(distributed collection of items),称为RDD(Resilient Distributed Dataset,弹性分布式数据集),它可被分发到集群各个节点上,进行并行操做。RDDs 能够经过 Hadoop InputFormats 建立(如 HDFS),或者从其余 RDDs 转化而来。
得到RDD的三种方式:
Parallelize:将一个存在的集合,变成一个RDD,这种方式试用于学习spark和作一些spark的测试
>>>sc.parallelize(['cat','apple','bat’])
MakeRDD:只有scala版本才有此函数,用法与parallelize相似
textFile:从外部存储中读取数据来建立 RDD
>>>sc.textFile(“file\\\usr\local\spark\README.md”)
RDD的两个特性:不可变;分布式。
RDD支持两种操做;Transformation(转化操做:返回值仍是RDD)如map(),filter()等。这种操做是lazy(惰性)的,即从一个RDD转换生成另外一个RDD的操做不是立刻执行,只是记录下来,只有等到有Action操做是才会真正启动计算,将生成的新RDD写到内存或hdfs里,不会对原有的RDD的值进行改变;Action(行动操做:返回值不是RDD)会实际触发Spark计算,对RDD计算出一个结果,并把结果返回到内存或hdfs中,如count(),first()等。
4.2 RDD的缓存策略
Spark最为强大的功能之一即是可以把数据缓存在集群的内存里。这经过调用RDD的cache函数来实现:rddFromTextFile.cache,
调用一个RDD的cache函数将会告诉Spark将这个RDD缓存在内存中。在RDD首次调用一个执行操做时,这个操做对应的计算会当即执行,数据会从数据源里读出并保存到内存。所以,首次调用cache函数所须要的时间会部分取决于Spark从输入源读取数据所须要的时间。可是,当下一次访问该数据集的时候,数据能够直接从内存中读出从而减小低效的I/O操做,加快计算。多数状况下,这会取得数倍的速度提高。
Spark的另外一个核心功能是能建立两种特殊类型的变量:广播变量和累加器。广播变量(broadcast variable)为只读变量,它由运行SparkContext的驱动程序建立后发送给会参与计算的节点。对那些须要让各工做节点高效地访问相同数据的应用场景,好比机器学习,这很是有用。Spark下建立广播变量只需在SparkContext上调用一个方法便可:
>>> broadcastAList = sc.broadcast(list(["a", "b", "c", "d", "e"]))