Spark入门阶段一之扫盲笔记

介绍

spark是分布式并行数据处理框架
与mapreduce的区别:
mapreduce一般将中间结果放在hdfs上,spark是基于内存并行大数据框架,中间结果放在内存,对于迭代数据spark效率更高,mapreduce老是消耗大量时间排序,而有些场景不须要排序,spark能够避免没必要要的排序所带来的开销,spark是一张有向无环图,spark支持scala,python,java等
适用范围:
spark更适合于迭代云端比较多的ml和dm运算,由于spark里面有rdd的抽象概念,spark比hadoop更通用,spark提供的数据集操做类型有不少,不像hadoop只提供map和reduce俩种操做,好比map,filter,flatmapt,sample,groupbykey,reducebykey,union,join,cogroup,mapvalues,sort,partionby等多种操做类型,spark
把这些操做称为transformations,同时还提供count,collect,reduce,lookup,save等多种action操做。这些多种多样的数据集操做类型,给开发上层应用的用户提供了方便,各个处理节点之间的通讯模型不在像hadoop那样就是惟一的data shuffle一种模式,用户能够明明,物化,控制中间结果的存储,分区等,能够说编程模型比hadoop更灵活。 html

spark是基于内存的迭代计算框架,使用与须要屡次操做特定数据集的应用场合,须要反复操做的次数越多,所须要读取的数据量越大,受益越大,数据量小可是计算密集度较大的场合,受益就相对较小. 不过因为rdd的特性,spark不适用那种一部细粒度更新状态的应用,例如web服务的存储或者增量的web爬虫和索引,就是对于那种增量修改的应用模型不合适。 java

spark和hadoop的结合:
spark能够直接对hdfs进行数据的读写,一样支持spark on yarn。spark能够与mapreduce运行于同集群中,共享存储资源与计算,数据仓库shark实现上借用hive,几乎和hive彻底兼容。 node

四种spark运行模式,local模型用于测试开发,standlone 独立集群模式,spark on yarn spark在yarn上 ,spark on mesos spark在mesos上。python

应用:
企业大数据应用: 1,count 平均值 2.分类,对比 3.趋势,统计分析 4,精准预测 人工智能
行业大数据案例:电商,传媒,能源,交通mysql

spark生态系统介绍:
spark 能够很容易和yarn结合,直接调用HDFS、Hbase上面的数据,和hadoop结合。
spark核心部分分为RDD。Spark SQL、Spark Streaming、MLlib、GraphX、Spark R等核心组件解决了不少的大数据问题 程序员

Spark分为driver和executor,driver提交做业,executor是application早worknode上的进程,运行task,driver对应为sparkcontext。Spark的RDD操做有transformation、action。Transformation对RDD进行依赖包装,RDD所对应的依赖都进行DAG的构建并保存,在worknode挂掉以后除了经过备份恢复还能够经过元数据对其保存的依赖再计算一次获得。看成业提交也就是调用runJob时,spark会根据RDD构建DAG图,提交给DAGScheduler,这个DAGScheduler是在SparkContext建立时一同初始化的,他会对做业进行调度处理。当依赖图构建好之后,从action开始进行解析,每个操做做为一个task,每遇到shuffle就切割成为一个taskSet,并把数据输出到磁盘,若是不是shuffle数据还在内存中存储。就这样再往前推动,直到没有算子,而后运行从前面开始,若是没有action的算子在这里不会执行,直到遇到action为止才开始运行,这就造成了spark的懒加载,taskset提交给TaskSheduler生成TaskSetManager而且提交给Executor运行,运行结束后反馈给DAGScheduler完成一个taskSet,以后再提交下一个,当TaskSet运行失败时就返回DAGScheduler并从新再次建立。一个job里面可能有多个TaskSet,一个application可能包含多个job。web

一、shark介绍:
shark基本上就是spark的框架基础上提供和hive同样的hivesql命令接口,为了最大程度的保持和hive的兼容性,shark使用hive的api来实现query parsing和logic plan generation,最后的physicalplan execution阶段用spark代替hadoop mapreduce,用过配置shark参数,shark能够自动在内存中缓存特定的rdd,实现数据重用,进而加快特定数据集的检索,同时,shark经过udf用户自定义函数实现特定的数据分析学习算法,使得sql数据查询和运算分析能结合在一块儿,最大化rdd的重复使用。算法

二、spark streaming介绍:
Spark Streaming 是 Spark 提供的对实时数据进行流式计算的组件,通常与kafka结合,基本的原理是将stream数据分红小的时间片断,以相似batch批量处理的方式来处理这些小部分数据。spark streaming构建在spark上,一方面是由于spark的低延迟执行引擎能够用于实时计算,此外小批量的处理方式使得他能够同时兼容批量和实时数据处理的逻辑和算法,方便了一些须要历史数据和实时数据联合分析的特定应用场景。
Spark Streaming也有一个StreamingContext,其核心是DStream,是经过以组时间序列上的连续RDD来组成的,包含一个有Time做为key、RDD做为value的结构体,每个RDD都包含特定时间间隔的数据流,能够经过persist将其持久化。在接受不断的数据流后,在blockGenerator中维护一个队列,将流数据放到队列中,等处理时间间隔到来后将其中的全部数据合并成为一个RDD(这一间隔中的数据)。其做业提交和spark类似,只不过在提交时拿到DStream内部的RDD并产生Job提交,RDD在action触发以后,将job提交给jobManager中的JobQueue,又jobScheduler调度,JobScheduler将job提交到spark的job调度器,而后将job转换成为大量的任务分发给spark集群执行。sql

三、Graphx
主要用于图的计算。核心算法有PageRank、SVD奇异矩阵、TriangleConut等。shell

四、Spark SQL
是Spark新推出的交互式大数据SQL技术。把sql语句翻译成Spark上的RDD操做能够支持Hive、Json等类型的数据。

五、Spark R
经过R语言调用spark,目前不会拥有像Scala或者java那样普遍的API,Spark经过RDD类提供Spark API,而且容许用户使用R交互式方式在集群中运行任务。同时集成了MLlib机器学习类库。

六、MLBase
从上到下包括了MLOptimizer(给使用者)、MLI(给算法使用者)、MLlib(给算法开发者)、Spark。也能够直接使用MLlib。ML Optimizer,一个优化机器学习选择更合适的算法和相关参数的模块,还有MLI进行特征抽取和高级ML编程 抽象算法实现API平台,MLlib分布式机器学习库,能够不断扩充算法。MLRuntime基于spark计算框架,将Spark的分布式计算应用到机器学习领域。MLBase提供了一个简单的声明方法指定机器学习任务,而且动态地选择最优的学习算法。

七、Tachyon
高容错的分布式文件系统。宣称其性能是HDFS的3000多倍。有相似java的接口,也实现了HDFS接口,因此Spark和MR程序不须要任何的修改就能够运行。目前支持HDFS、S3等。

什么是rdd:

rdd是spark最基本,也是最根本的数据抽象,RDD表示分布在多个计算节点上的能够并行操做的元素集合,rdd是只读的,分区记录的集合。
rdd支持两种操做,1,转换从现有的数据集建立一个新的数据集,2,动做 在数据集上运行计算后,返回一个值给驱动程序,例如,map就是一种转换,他将数据集每个元素都传递给函数,并返回一个新的分布数据集表示结果,另外一个方面,reduce是一个动做,经过一些函数将全部的元组叠加起来,并将结果返回给driver程序,spark中的全部转换都有惰性的,也就是说,他们并不会直接计算结果,相反的,他们只是记住应用哦个到基础数据集上的这些转换动做,例如,咱们能够实现,经过map建立的一个新数据集,并在reduce使用,最终只返回reduce的结果给driver,而不是整个大的新数据集。默认状况下,每一个转换过的rdd都会在你在他之上执行一个动做时被从新计算,不过,你也可使用persist方法,持久话一个rdd在内存中,在这种状况下,spark将会在集群中,保存相关元素,下次你查询这个rdd是,他将能更快访问,在磁盘上持久化数据集,或在集群间赋值数据集也是支持的。除了这些操做外,用户还能够请求将rdd缓存起来,并且,用户还能够经过partitioner类获取rdd的分区顺序,而后将另外一个rdd按照一样的方式分区。

如何操做rdd?
一、如何获取rdd 1,从共享的文件系统获取,hdfs,2.经过已存在的rdd转换 3.将已存在的scala集合并行化,经过调用sparkcontext的parallelize方法实现 4.改变现有rdd的之久性,rdd是懒散,短暂的
二、操做rdd的俩个动做,1,actions:对数据集计算后返回一个数值value给驱动程序,例如redue将数据集的全部元素用某个函数聚合后,将最终结果返回给程序,2.transformation 根据数据集建立一个新的数据集,计算后返回一个新rdd;例如map将数据的每一个元素讲过某个函数计算后,返回一个姓的分布式数据集。

actions具体内容:

  • reduce(func)经过函数func汇集数据集中全部元素,func函数接受2个参数,返回一个值,这个函数必须是关联性的,确保能够被正确的并发执行。

  • collect() 在driver的程序中,以数组的形式,返回数据集的全部元素,这一般会在使用filter或者其余操做后,返回一个纵沟小的数据本身在使用,直接将整个rdd集coloect返回,极可能会让driver程序oom。

  • count() 返回数据集的元素个数

  • take(n) 返回一个数组,用数据集的前n个元素组成,注意,这个操做目前并不是在多个节点上,并行执行,而是driver程序所在机制,单机计算全部的元素:注;gateway的内存压力会增大,须要谨慎使用

  • first()返回数据集的第一个元素

  • saveAsTextFile(path) 将数据集的元素,以txtfile的形式,保存到本地文件系统,hdfs或者其余hadoop支持的文件系统,spark将会调用每一个元素的tostring方法,并将他转换成文件中一行文本。

  • saveAsSequenceFile(path)将数据集的元素,以sequencefile的格式,到指定的目录下,本地系统,hdfs或者其余hadoop支持的文件系统,rdd的元组必须有key-value对组成,并都实现了hadoop的- writable接口或隐式能够转换为wirtable

  • foreach(func)在数据集的每一个元素上,运行函数func,这一般用于更新一个累加器变量,或者和外部存储系统作交互。直接使用 rdd.foreach(println) 在local模式下是可行的,可是在cluster模式下是不行的,必需要执行collect()方法,将全部的数据拉取到本地,而后执行foreach()操做。若是是数据量比较小的话可使用take方法,rdd.take(100).foreach(println)

transformation具体内容:

  • map(func) 返回一个新的分布式数据集,有每一个原元素通过func函数转换后组成

  • filter(func) 返回一个新的数据集,有通过func函数后返回值为true的原元素组成

  • flatmap(func)相似于map 可是每个输入元素,会被映射0到多个输出元素,所以func函数的返回值是一个seq,而不是单一元素

  • sample(withReplacement,frac,seed) 给定的随机种子seed,随机抽样出数量为frac的数据

  • union(otherdataset)返回一个新的数据集,由原数据集和参数联合而成

  • intersection : 只返回两个RDD中都有的元素,intersecton()在运行时会去掉全部重复的元素(单个RDD内重复元素也会一块儿移除)。 须要经过网络混洗来发现共有数据。

  • distinct : 生成一个只包含不一样元素的新RDD。须要注意:distinct() 操做的开销很大,由于它须要将全部数据经过网络进行混洗(shuffle),以确保每一个元素只有一份。

  • subtract : 接受另外一个RDD做为参数,返回一个由只存在在第一个RDD而不存在第二个RDD中的全部元素组成的RDD。 须要数据混洗。

  • cartesian : 返回全部可能的(a,b)对,其中a是源RDD中的元素,b是另外一个RDD中的元素。

  • groupbykey(【num tasks】)在一个有kv对组成的数据集上调用,返回一个k,seq【v】对的数据集,注意,默认状况下,使用8个并行任务进行分组,你能够传入num task可选参数,根绝数据量设置不一样数目的task

  • reducebykey(func,【num tasks】)在一个kv对的数据集上使用,返回一个kv的数据集,key相同的值都被使用指定的reduce函数聚合在一块儿,和groupbykey相似,任务个数是第二个参数来配置

  • join(otherdataset,【num tasks】)在类型kev和kw类型的数据集上调用,返回一个k(v w)对,每一个key中全部元素都在一块儿的数据集

  • groupwith(otherdataset,【num tasks】)在类型为kv和kw类型的数据集上调用,返回一个数据集,组成元组为k seq【v】seq[w]tuples ,这个在其余框架称为cogroup

  • cartesian(otherdataset) 笛卡儿积,但在数据集t和u调用是,返回一个tu对的数据集,全部元素交互进行笛卡儿积。

持久化(缓存)

  • persist()

  • cache()

基本开发思路

每一个saprk应用都有一个驱动器程序来发起集群上的各类并行操做。驱动器程序经过一个SparkContext对象来访问Spark。这个对象表明对计算集群的一个链接。
一旦有了SparkContext,你就能够用它来建立RDD。要执行这些操做,启动器程序通常要管理多个执行器(executor)节点。
能够先经过SparkConf对象来配置你的应用,而后基于这个SparkConf建立一个SparkContext对象。
建立SparkConf的基本方法,传递两个参数:
一、集群URL:告诉Spark如何链接到集群上。
二、应用名:当链接到一个集群式,这个值能够帮助你在集群管理器的用户界面中找到你的应用。

关闭Spark:调用SparkContext的stop()方法。或直接退出应用。(system.exit(0)/sys.exit())
在Spark中,对数据的全部操做不外乎是: 建立RDD、 转化已有的RDD、调用RDD操做进行求值
Spark中的RDD是一个不可变的分布式对象集合。每一个RDD都被分为多个分区,这些分区运行在集群中的不一样节点上。
当咱们调用一个新的行动操做时,整个RDD都会从头开始计算。要避免这种行为,用户能够将中间结果持久化。

demo(Python版)

一、初始化sparkcontext

from pyspark import SparkConf, SparkContxt
conf = SparkConf().setMaster("local").setAppName("my app")
sc = SparkContext(conf=conf)

# 关闭链接
sc.stop()

二、RDD编程

# 从文件读取数据
line = sc.textFile("README.md")
# parallelize 方法
line = sc.parallelize(['pandas','i like pandas'])


inputRDD = sc.textFile('log.txt')
errRDD = inputRDD.filter(lambda x:'error' in x)
warnRDD = inputRDD.filter(lambda x:'warning' in x)
bindRDD = errRDD.union(warnRDD)


bindRDD.count()
bindRDD.take(10)
# 返回所有数据集
bindRDD.collect()


# lambda 函数
word = rdd.filter(lambda s:'python' in s)
# def 定义的函数
def containsErr(s):
    return 'error' in s
word = rdd.filter(containsErr)

2.一、RDD常见转换操做
以 rdd={1,2,3,3} 为例的转换操做

# 将函数应用与RDD中的每一个元素,将返回值构建新的RDD
rdd.map(x => x+1)

# 将函数应用用RDD中的每一个元素,将返回的迭代器的全部内容构成新的RDD。一般用于切分单词。
rdd.flatMap(x=>x.to(3))  --> {1,2,3,2,3,3,3})

# 返回一个由经过传给filter()的函数的元素组成的RDD
rdd.filter(x=>x!=1)  -->  {2,3,4}

# 去重
rdd.distinct()  -->  {1,2,3}

sample(withReplacement,fraction,[seed])
# 对RDD进行采样,以及是否替换
rdd.sample(false,0.5)   -->  非肯定的

以{1,2,3}和{3,4,5}的RDD转换操做

# 求并集
rdd.union(other)  --> {1,2,3,4,5}

# 求交集
rdd.intersection(other)  --> {3}

# 移除一个RDD中的内容,至关于减去一个交集
rdd.subtract(other)  -->  {1,2}

# 与另外一个RDD的笛卡尔积
rdd.cartesian(other)  --> {(1,3),(1,4)...(3,5)}

2.二、RDD常见行动操做
以{1,2,3,3}为列说明常见行动操做

# 返回RDD中全部的元素
rdd.collect()  --> {1,2,3,3}

# 计数
rdd.count()

# 各元素在RDD中出现的次数
rdd.countByValue()   --> {(1,1),(2,1),(3,2)}

take(num)
# 返回前n元素

top(n)
# 排序后的前n个元素

# 按照指定顺序,从rdd中返回前n个元素
rdd.takeOrdered(2)(myOrdering)   --> {3,3}

takeSample(withReplacement,num,[seed])
# 从RDD中返回任意一些元素
rdd.takeSample(false,1)  --> 非肯定的

# 并行整合rdd中全部的数据,好比sum
rdd.reduce((x,y)=>x+y)  --> 9

fold(zero)(func)
# 和reduce()同样,可是须要提供初始值
rdd.fold(0)((x,y)=>x+y) --> 9

aggregate(zeroValue)(seqOp,combOp)
# 和reduce相似,可是一般返回不一样类型的函数
aggregate((0,0))((x,y)=>(x._1+y,x._2+1),
                 (x,y)=>(x._1+y._1,x._2+y._2) )  --> 9

# 对RDD中的每一个元素使用给定的函数
rdd.foreach(func)

2.三、持久化缓存

from pyspark.storage import StorageLvel
rdd.presist(StoragLevel.DISK_ONLY)

RDD.cache()

# 缓存的级别
# MEMORY_ONLY
# MEMORY_ONLY_SER
# MEMORY_AND_DISK  # 若是内存放不下,则溢出写到磁盘上
# MEMORY_AND_DISK_SER  # 若是内存放不下,则溢出写到磁盘上,在内存中存放序列化后的数据
# DISK_ONLY

# 移除缓存
RDD.unpersist()

三、键值对操做

# 以{(1,2),(3,4),(3,6)}为例

# 合并具备形同键的值
rdd.reduceByKey((x,y)=>x+y)  -->{(1,2),(3,10)}

# 对具备相同键的值分组
rdd.groupByKey()  --->{(1,[2]),(3,[4,6])}

combineByKey(createCombiner,mergeValue,mergeComBiners,partitioner)
# 使用不一样的返回类型合并具备相同键的值。有多个参数分别对应聚合操做的各个阶段,于是很是适合用来解释聚合操做各个阶段的功能划分。
# 下面是求每一个键的平均值
sumCount=num.combineByKey((lambda x:(x,1)),
                           (lambda x,y:(x[0]+y,x[1]+1)),
                           (lambda x,y:(x[0+y[0],x[1]+y[1]])))
sumCount.map(lambda key,xy:(key,xy[0]/xy[1])).collectAaMap()

# 对pairRDD的每一个值应用一个函数而不改变键
rdd.mapVlues(x=>x+1)

# 对pairRDD的每一个值应用一个返回迭代器的函数,而后对返回的每一个元素都生成一个对应原键的键值对记录,一般用于符号化
rdd.flatMapValues(x=>(x to 5))  -->{(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)}

# 返回一个仅含有键的RDD
rdd.keys()   ->{1,3,3}

# 返回一个仅包含值的RDD
rdd.values()   -->{2,4,6}

# 返回一个根据键排序的RDD
rdd.sortByKey(ascending=True)  -->{(1,2),(3,4),(3,6)}

3.一、两个键值对RDD的转换操做

# 以rdd={(1,2),(3,4),(3,6)} other={(3,9)} 为例

# 删除rdd中键与other中键相同的元素
rdd.subtracByKey(other)  --> {(1,2)}

# 对两个rdd内连接
rdd.join(other)   --> {(3,(4,9)),(3,(6,9))}

# 对两个rdd进行链接操做,确保第一个rdd中的键必须存在(右外连接)
rdd.rightOuterJoin(other)   --> {(3,(some(4),9)),(3,(some(6),9))}

# 对两个rdd进行链接操做,确保第二个rdd中的键必须存在(左外链接)
rdd.leftOuterJoin(other)  --> {(1,(2,None)),(3,(4,some(9))),(3,(6,some(9)))}

# 将两个rdd中拥有相同键的数据分组到一块儿
rdd.congroup(other)  --> {(1,([2],[])),(3,([4,6],[9]))}

3.二、键值对Pair RDD的行动操做

# 以 rdd={(1,2),(3,4),(3,6)} 为例

# 对每一个键对应的元素分别计数
rdd.countByKey()  --> {(1,1,),(3,2)})

# 将结果以映射表的形式返回,以便查询
rdd.collectAsMap()  --> Map{(1,2),(2,6)}

# 返回给定键对应的全部值
rdd.lookup(3)   --> [4,6]

四、并行度调优
每一个rdd都有固定数目的分区,分区数决定了在rdd上执行操做的并行度。 大多数操做符都能接受第二个参数,用来指定分组结果或者聚合结果的rdd的分区数。
好比 sc.parallelize(data).reduceByKey(lambda x,y:x+y,10) 指定分区数10
查看分区数 rdd.partitions.size或rdd.getNumPartitions ,改变分区的方法repartition()

五、数据读取与保存
读取txt文件,输入的每一行都会成为RDD的一个元素。

# 读取文件
input=sc.textFile("file:///home/holden/README.md")
# 保存文件
result.saveAsTextFile(outputFile)

读取json

# 将json文件的每一行假设为一条记录来处理
import json
data = input.map(lambda x:json.load(x))
# 写
(data.filter(lambda x:x[lovesPandas"]).map(lambda x:json.dumps(x)).saveAsTextFile(outputFile))

读取csv,一样是将读取的文本的每一行当作一条记录

import csv
from io import StringIO
def loadRecord(line):
    """解析一行csv记录"""
    input = StringIO(line)
    reader = csv.DictReader(input,filednames=["name","favouriteAnimal"])
    return reader.next()
input = sc.textFile(inputFile).map(loadRecord)

# 保存csv
def writeRecords(records):
    """写出一些csv记录"""
    output = StringIO()
    writer = csv.DictWriter(output,fieldnames=["name","favoriteAnimal"])
    for record in records:
        writer.writerow(record)
    return [output.getvalue()]
pandaLovers.mapPartitions(writeRecords).saveAsTextFile(outputFile)

读取SequenceFile
Hadoop输入输出格式
关系型数据库
HBase

六、Spark进阶编程
6.一、两种类型的共享变量

  • 累加器(qccumulator):用于对信息聚合,提供了将工做节点中的值聚合到驱动器程序中的简单语法。

  • 广播变量(broadcast variable):用来高效分发较大的对象,让程序高效地向全部工做节点发送一个较大的值,以供一个或多个spark操做使用。

# 在python中累加空行,使用了累加器
file = sc.textFile(inputFile)
# 建立累加器并初始化为0
blankLine=sc.accumulator(0)

def extractCallSigs(line):
    global blankLine  # 访问全局变量
    if (line==""):
        blankLine+=1
    return line.split(" ")

callSigns = file.flatMap(extractCallSigns)
callSigns.saveAsTextFile(output)


# 使用广播变量查询国家
# 查询rdd中呼叫号对应的位置,将呼号前缀读取为国家代码来进行查询
signPrefixes = sc.broadcast(loadCallSignTable())   # 广播变量

def processSignCount(sign_count,signPrefixes):
    country=lookupCountry(sign_count[0],signPrefixes.value)
    count = sign_count[1]
    return (country,count)

countryContactCounts=(contactCounts.map(processSignCount).reduceByKey((lambda x,y:x+y)))
countryContactCounts.saveAsTextFile(output)

基于分区进行操做
spark提供基于分区的map和foreach,使部分代码只对rdd的每一个分区运行一次,能够帮助下降这些操做的代价。

# 按照分区执行的操做符

mapPartitions()
# 参数:该分区中元素的迭代器。返回:元素的迭代器
# 对于RRD[T]的函数签名 :f:(iterator[T])  --> iterator[U]

mapPartitionsWithIndex()
# 参数:分区序号,以及每一个分区中的元素的迭代器。返回:元素的迭代器
# 对于RRD[T]的函数签名 :f:(int,iterator[T])  --> iterator[U]

foreachPartitions()
# 参数:元素迭代器。返回:无
# 对于RRD[T]的函数签名 :f:(iterator(T))  -->Unit

数值RDD的操做

count()
# RDD中元素个数
mean()
# 元素平均值
sum()
# 
max()
min()
variance()  # 方差
sampleVariance()  # 从采样中计算出的方差
stdev()  # 标准差
sampleStdev()   # 采用的标准差

七、基于MLlib的机器学习

# 逻辑回归的垃圾邮件分类
from pyspark.mllib.regression import LabeldPoint
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.classification import LogisticRegressionWithSGD

spam=sc.textFile('spam.txt')
normal = sc.textFile('normal.txt')

# 建立一个HashingTF实例来把邮件文本映射为包含10000个特征的向量
tf=HashingTF(numFeatures=10000)
# 各邮件都切分为单词,每一个单词映射为一个特征
spamFeatures = spam.map(lambda email: tf.transForm(email.split(' ')))
normalFeatures = normal.map(lambda email: tf.transform(email.split(' ')))

# 建立LabelPoint数据集分别存放阳性(垃圾邮件)和阴性(正常邮件)的例子
positiveExample = spamFeatures.map(lambda features:LabeldPoint(1,features))
negativeExamples = normalFeatures.map(lambda features:labeldPoint(0,features))
trainingData = positiveExample.union(negativeExample)
trainingData.cache()  # 由于逻辑回归是迭代算法,因此须要缓存训练数据RDD

# 使用SGD算法
model = LogisticRegressionWithSGD.train(trainningData)

# 以阳性和阴性的例子分别测试。
# 首先用同样的HashingTF特征来获得特征向量,而后对该向量应用获得的模型
posTest = tf.transform("O M G GET cheap stuff by sending money to ...".split(' '))
negTest = tf.transform("Hi Dad, i started studying spark the other ...".split(' '))
print( "predict for postive test example:%g" % model.predict(posTest))
print( "predict for negative test example:%g" % model.predict(negTest))

MLlib包含一些特有的数据类型,对于Scala和Java,它们位于org.apache.spark.mllib下,对于Python则是位于pyspark.mllib下。

入门:

spark有两个重要的抽象:

  • RDD,分布式弹性数据集,他是一个跨越多个节点的分布式集合。

  • 另外一个抽象是共享变量。spark支持两种类型的共享变量:一个是广播(broadcast variables)他能够缓存一个值在集群的各个节点。另外一个是累加器(accumulators)他只能执行累加的操做,好比能够作计数器和求和。

初始化 Spark
在一个Spark程序中要作的第一件事就是建立一个SparkContext对象来告诉Spark如何链接一个集群。为了建立SparkContext,你首先须要建立一个SparkConf对象,这个对象会包含你的应用的一些相关信息。这个一般是经过下面的构造器来实现的:
new SparkContext(master, appName, [sparkHome], [jars])
参数说明:

  • master:用于指定所链接的 Spark 或者 Mesos 集群的 URL。

  • appName :应用的名称,将会在集群的 Web 监控 UI 中显示。

  • sparkHome:可选,你的集群机器上 Spark 的安装路径(全部机器上路径必须一致)。

  • jars:可选,在本地机器上的 JAR 文件列表,其中包括你应用的代码以及任何的依赖,Spark 将会把他们部署到全部的集群结点上。
    在 python 中初始化,示例代码以下:

//conf = SparkContext("local", "Hello Spark")
conf = SparkConf().setAppName("Hello Spark").setMaster("local")
sc = SparkContext(conf=conf)

说明:若是部署到集群,在分布式模式下运行,最后两个参数是必须的,第一个参数能够是如下任一种形式:
Master URL 含义

  • local 默认值,使用一个 Worker 线程本地化运行(彻底不并行)

  • local[N] 使用 N 个 Worker 线程本地化运行,N 为 * 时,表示使用系统中全部核

  • local[N,M] 第一个表明的是用到的核个数;第二个参数表明的是允许该做业失败M次

  • spark://HOST:PORT 链接到指定的 Spark 单机版集群 master 进程所在的主机和端口

  • mesos://HOST:PORT 链接到指定的 Mesos 集群。host 参数是Moses master的hostname。端口默认是5050
    若是你在一个集群上运行 spark-shell,则 master 参数默认为 local。在实际使用中,当你在集群中运行你的程序,你通常不会把 master 参数写死在代码中,而是经过用 spark-submit 运行程序来得到这个参数。可是,在本地测试以及单元测试时,你仍须要自行传入 local 来运行Spark程序。

运行代码有几种方式,一是经过 spark-shell 来运行 scala 代码,一是编写 java 代码并打成包以 spark on yarn 方式运行,还有一种是经过 PySpark 来运行 python 代码。

在 spark-shell 和 PySpark 命令行中,一个特殊的集成在解释器里的 SparkContext 变量已经创建好了,变量名叫作 sc,建立你本身的 SparkContext 不会起做用。

<dependencies>
 <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>2.1.1</version>
  </dependency>
  <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    <scope>test</scope>
  </dependency>
</dependencies>

建立一个简单的spark程序:

public class SimpleApp {
  public static void main(String[] args) {
      // 文件路径
      String logFile = "/home/wm/apps/spark-1.4.0-bin-hadoop2.6/README.md";
      SparkConf conf = new SparkConf().setAppName("Simple Application").setMaster("local");
      JavaSparkContext sc = new JavaSparkContext(conf);
      JavaRDD<String> logData = sc.textFile(logFile).cache();
      @SuppressWarnings("serial")
      long numAs = logData.filter(new Function<String, Boolean>() {
          public Boolean call(String s) throws Exception {
              return s.contains("a");
          }

      }).count();
      @SuppressWarnings("serial")
      long numBs = logData.filter(new Function<String, Boolean>() {

          public Boolean call(String s) throws Exception {
              return s.contains("b");
          }

      }).count();
      System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);
      sc.close();
  }
}

Spark的核心就是围绕着RDD,它是一个自动容错的分布式数据集合。他有两种方式建立,第一种就是在驱动程序中对一个集合进行并行化。第二种是来源于一个外部的存储系统。好比:共享系统、HDFS、HBase或者任何提供任何Hadoop 输入格式的数据源。

第一种:Parallelized Collections 建立这个集合须要调用那个JavaSparkContext的parallelize方法来初始化一个已经存在的集合。

List<Integer> data = Arrays.asList(1,2,3,4,5);
JavaRDD<Iteger> distData = sc.parallelize(data);

这就建立了一个并行的集合,在这个集合上能够执行 distData.reduce((a, b) -> a + b)
在并行数组中一个很重要的参数是partitions,它来描述数组被切割的数据集数量。Spark会在每个partitions上运行任务,这个partitions会被spark自动设置,通常都是集群中每一个CPU上运行2-4partitions,可是也能够本身设置,能够经过parallelize (e.g. sc.parallelize(data, 10)),在有些地方把partitions成为 slices。

第二种:External Datasets

JavaRDD distFile = sc.textFile("data.txt");

textFile也能够设置partitions参数,通常都是一个block一个partitions,可是也能够本身设置,本身设置必需要不能少于block的数量。
针对Hadoop的其余输入格式,你能用这个JavaSparkContext.hadoopRDD方法,你须要设置JobConf和输入格式的类。也可使用JavaSparkContext.newAPIHadoopRDD针对输入格式是基于“new”的MapReduceAPI

demo(python) 分析 Nginx 日志中状态码出现次数

先将测试数据上传到 hdfs:
$ hadoop fs -put access.log
而后,编写一个 python 文件,保存为 SimpleApp.py:

from pyspark import SparkContext

logFile = "access.log"

sc = SparkContext("local", "Simple App")

rdd = sc.textFile(logFile).cache()

counts = rdd.map(lambda line: line.split()[8]).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).sortByKey(lambda x: x) 

# This is just a demo on how to bring all the sorted data back to a single node.  
# In reality, we wouldn't want to collect all the data to the driver node.
output = counts.collect()  
for (word, count) in output:  
    print "%s: %i" % (word, count)  

counts.saveAsTextFile("/data/result")

sc.stop()

接下来,运行下面代码:

$ spark-submit  --master local[4]   SimpleApp.py

demo(java) 统计单词出现次数

JavaRDD<String> lines = sc.textFile("data.txt");
JavaPairRDD<String, Integer> pairs = lines.mapToPair(s -> new Tuple2(s, 1));
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);

demo (java) 读取HDFS中的数据,并简单分析,最后结果写入mysql数据库中。

<dependency> <!-- Spark dependency -->
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>2.11</version>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.13</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.12</version>
    <scope>test</scope>
</dependency>

因为须要读取HDFS中的数据,因此须要hadoop-client文件

在main函数中首先建立JavaSparkcontext对象。

SparkConf conf = new SparkConf().setAppName("FindError");
JavaSparkContext sc = new JavaSparkContext(conf);
/**
* 
* 列出指定目录中的文件,这里的文件是不包括子目录的。
* @param pathOfDirectory
*     目录路径
* @return
* @throws IOException 
*/
public static String[] findFilePathFromDir(String dst) throws IOException {
  Set<String> filePathSet = new HashSet<String>();
  String[] result = null;
  Configuration conf = new Configuration();
  FileSystem fs = FileSystem.get(URI.create(dst), conf);
  FileStatus fileList[] = fs.listStatus(new Path(dst));
  int size = fileList.length;
  for (int i = 0; i < size; i++) {
      filePathSet.add(fileList[i].getPath().toString());
  }
  if (filePathSet.size() > 0) {
      result = new String[filePathSet.size()];
      int i = 0;
      for (String str : filePathSet) {
          result[i++] = str;
      }
  }
  fs.close();
  return result;
}

依次遍历文件路径并为每一个文件建立一个新的RDD而后计算出这个文件中包涵ERROR字符串的行数。

Map<String, Long> result = new HashMap<String, Long>();
if (filePaths != null) {
  for (String path : filePaths) {
      result.put(path, sc.textFile(path).filter(new Function<String, Boolean>() {

          public Boolean call(String line) throws Exception {
              return line.contains("ERROR");
          }

      }).count());
  }
}

将results中的数据写入mysql中

/**
* 将结果写入mysql中
* @param result
* @throws Exception 
*/
public static void wirteResultToMysql(Map<String, Long> result) throws Exception {
  String DBDRIVER = "com.mysql.jdbc.Driver";  
  //链接地址是由各个数据库生产商单独提供的,因此须要单独记住  
  String DBURL = "jdbc:mysql://ip:3306/test";  
  //链接数据库的用户名  
  String DBUSER = "root";  
  //链接数据库的密码  
  String DBPASS = "root";
  Connection con = null; //表示数据库的链接对象  
  PreparedStatement pstmt = null; //表示数据库更新操做  
  String sql = "insert into aaa values(?,?)";  
  Class.forName(DBDRIVER); //一、使用CLASS 类加载驱动程序  
  con = DriverManager.getConnection(DBURL,DBUSER,DBPASS); //二、链接数据库  
  pstmt = con.prepareStatement(sql); //使用预处理的方式建立对象  
  if (result != null) {
      for (String str : result.keySet()) {
          pstmt.setString(1, str);
          pstmt.setLong(2, result.get(str));
          pstmt.addBatch();
      }
  }
  //pstmt.executeUpdate(); //执行SQL 语句,更新数据库  
  pstmt.executeBatch();
  pstmt.close();  
  con.close(); // 四、关闭数据库  
}

共享变量

一般状况下,当一个函数传递给一个在远程集群节点上运行的Spark操做(好比map和reduce)时,Spark会对涉及到的变量的全部副本执行这个函数。这些变量会被复制到每一个机器上,并且这个过程不会被反馈给驱动程序。一般状况下,在任务之间读写共享变量是很低效的。可是,Spark仍然提供了有限的两种共享变量类型用于常见的使用场景:广播变量和累加器。
一、广播变量
广播变量容许程序员在每台机器上保持一个只读变量的缓存而不是将一个变量的拷贝传递给各个任务。它们能够被使用,好比,给每个节点传递一份大输入数据集的拷贝是很低效的。Spark 试图使用高效的广播算法来分布广播变量,以此来下降通讯花销。 能够经过 SparkContext.broadcast(v) 来从变量 v 建立一个广播变量。这个广播变量是 v 的一个包装,同时它的值能够功过调用 value 方法来得到。如下的代码展现了这一点:

broadcastVar = sc.broadcast([1, 2, 3])
<pyspark.broadcast.Broadcast object at 0x102789f10>

>>> broadcastVar.value
[1, 2, 3]

在广播变量被建立以后,在全部函数中都应当使用它来代替原来的变量v,这样就能够保证v在节点之间只被传递一次。另外,v变量在被广播以后不该该再被修改了,这样能够确保每个节点上储存的广播变量的一致性(若是这个变量后来又被传输给一个新的节点)。

二、累加器
累加器是在一个相关过程当中只能被”累加”的变量,对这个变量的操做能够有效地被并行化。它们能够被用于实现计数器(就像在MapReduce过程当中)或求和运算。Spark原生支持对数字类型的累加器,程序员也能够为其余新的类型添加支持。累加器被以一个名字建立以后,会在Spark的UI中显示出来。这有助于了解计算的累进过程(注意:目前Python中不支持这个特性)。

能够经过SparkContext.accumulator(v)来从变量v建立一个累加器。在集群中运行的任务随后可使用add方法或+=操做符(在Scala和Python中)来向这个累加器中累加值。可是,他们不能读取累加器中的值。只有驱动程序能够读取累加器中的值,经过累加器的value方法。

如下的代码展现了向一个累加器中累加数组元素的过程:

accum = sc.accumulator(0)
Accumulator<id=0, value=0>

>>> sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
10

这段代码利用了累加器对 int 类型的内建支持,程序员能够经过继承 AccumulatorParam 类来建立本身想要的类型支持。AccumulatorParam 的接口提供了两个方法:zero用于为你的数据类型提供零值;addInPlace 用于计算两个值得和。好比,假设咱们有一个 Vector类表示数学中的向量,咱们能够这样写:

class VectorAccumulatorParam(AccumulatorParam):
    def zero(self, initialValue):
        return Vector.zeros(initialValue.size)

    def addInPlace(self, v1, v2):
        v1 += v2
        return v1

# Then, create an Accumulator of this type:
vecAccum = sc.accumulator(Vector(...), VectorAccumulatorParam())

累加器的更新操做只会被运行一次,Spark 提供了保证,每一个任务中对累加器的更新操做都只会被运行一次。好比,重启一个任务不会再次更新累加器。在转化过程当中,用户应该留意每一个任务的更新操做在任务或做业从新运算时是否被执行了超过一次。

累加器不会改变Spark 的惰性求值模型。若是累加器在对RDD的操做中被更新了,它们的值只会在启动操做中做为 RDD 计算过程当中的一部分被更新。因此,在一个懒惰的转化操做中调用累加器的更新,并无法保证会被及时运行。 下面的代码段展现了这一点:

accum = sc.accumulator(0)
data.map(lambda x => acc.add(x); f(x))
// 这里,accum任然是0,由于没有action算子,因此map也不会进行实际的计算

任务的提交以及Standalone集群模式的部署

参考官方文档:http://spark.apache.org/docs/...
spark-submit
首先须要打包代码,若是你的代码须要依赖其余的包环境则须要单独的打包这些依赖,应为cluster会将全部依赖的jar包分发到各个节点上进行使用。推荐的方法是将依赖包和程序都统一的打成一个包,这样就能够直接使用spark-submit方法来运行,具体的pom.xml配置以下:

<dependencies>
    <dependency> <!-- Spark dependency -->
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>2.11</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.13</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>2.6.0</version>
        <scope>provided</scope>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.11</version>
        <scope>test</scope>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <!-- 使用1.7 jdk进行编译 -->
                <source>1.7</source>
                <target>1.7</target>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>2.5.5</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

spark && hadoop 的scope值都设置为provided
在服务器上提交的命令以下:

./bin/spark-submit \
  --class <main-class>
  --master <master-url> \
  --deploy-mode <deploy-mode> \
  --conf <key>=<value> \
  ... # other options
  <application-jar> \
  [application-arguments]

spark-submit 能够加载一个配置文件,默认是加载在conf/spark-defaults.conf

单元测试

Spark对全部常见的单元测试框架提供友好的支持。你只须要在测试中建立一个SparkContext对象,而后吧master URL设为local,运行测试操做,最后调用 SparkContext.stop() 来中止测试。注意,必定要在 finally 代码块或者单元测试框架的 tearDown方法里调用SparkContext.stop(),由于Spark不支持同一程序中有多个SparkContext对象同时运行。

部署

一、Spark Standalone Mode
除了运行在Mesos和YARN集群以外,spark也提供了简单的独立部署模式。能够经过手动的启动master和worker,也能够经过spark提供的启动脚原本启动。独立部署也能够经过运行在一个机器上,进行测试。
为了安装你须要放置一个编译好的spark版本到每一个机器上。
启动集群有两种方式,一种是手动启动,另外一种是经过启动脚本启动。
1.一、手动启动spark集群
启动一个独立的master可使用以下的命令:
./sbin/start-master.sh
一旦启动能够经过访问:http://localhost:8080端口访问master
可使用以下的命令来使worker节点链接到master上:
./sbin/start-slave.sh <worker#> <master-spark-URL>
worker在加入到master后能够访问master的http://localhost:8080,能够看到被加入的worker节点的信息。
在启动master和worker的时候能够带上参数进行设置,参数的列表以下:其中比较重要的是:
-c CORES, 这个是指定多少个cpu分配给spark使用,默认是所有cpu
-m MEM,这个是指定多少的内存分配给spark使用,默认是所有的内存的减去1g的操做系统内存所有分配给spark使用。通常的格式是1000M or 2G
-d DIR, 这个指定spark任务的日志输出目录。
–properties-file FILE 指定spark指定加载的配置文件的路径默认是: conf/spark-defaults.conf

1.二、脚本方式部署
经过spark的部署脚本部署首先须要在spark的主目录下建立一个conf/slaves的文件,这个文件中每一行表明一个worker的hostname.须要注意的是,master访问worker节点是经过SSH访问的,因此须要master经过ssh无密码的登陆到worker,不然须要设置一个 SPARK_SSH_FOREGROUND的环境变量,这个变量的值就是每一个worker的密码

而后能够经过spark安装目录下的sbin/….sh文件进行启动, 若是要启动和中止master和slave可使用:
sbin/start-all.sh
sbin/stop-all.sh
注意的是这些脚本必须是在master机器上执行

同时你能够经过配置集群的 conf/spark-env.sh文件来进一步配置集群的环境。可是这也文件须要经过拷贝conf/spark-env.sh.template文件来建立,而且须要把这个文件拷贝到全部的worker节点上。
其中: SPARK_MASTER_OPTS && SPARK_WORKER_OPTS 两个配置项比较复杂。
经过在SparkContext构造器中传入spark://IP:PORT这个来启用这个集群。同时能够在交互式的方式启动脚本中使用:./bin/spark-shell –master spark://IP:PORT 来启动集群执行。
独立部署模式的集群如今只是简单的支持FIFO调度。 为了容许多个并发用户,能够经过SparkConf设置每一个应用程序须要的资源的最大数。默认状况下,它会请求使用集群的所有的核,而这只是同时运行一个应用程序才回有意义。

val conf = new SparkConf()
             .setMaster(...)
             .setAppName(...)
             .set("spark.cores.max", "10")
val sc = new SparkContext(conf)

除了能够在程序中指定你也能够在spark-env.sh中设置默认的值,export SPARK_MASTER_OPTS="-Dspark.deploy.defaultCores=<value>"

二、spark的高可用设置
spark的高可用设置有两种,一种是经过Zookeeper来实现,另外一种是经过本地文件系统来实现。

2.一、使用ZooKeeper备份master,利用zookeeper提供的领导选举和状态保存,你可让更多的master链接到zookeepre实例。一个将会被选举为leader其余的则会保存备份他的状态。若是master死掉,zookeeper能够选举一个新的leader,整个过程须要1到2分钟的时间,可是这个过程只会对新的任务调度有影响。为了使用这种方式须要的配置项为:SPARK_DAEMON_JAVA_OPTS,这个配置项有三个配置信息:spark.deploy.recoveryMode/spark.deploy.zookeeper.url/spark.deploy.zookeeper.dir

2.二、使用本地文件系统来恢复该节点。为了使用这种方式须要的配置项为:SPARK_DAEMON_JAVA_OPTS,这个配置项有两个配置信息:spark.deploy.recoveryMode、spark.deploy.recoveryDirectory

Spark架构与原理


Spark架构采用了分布式计算中的Master-Slave模型。Master是对应集群中的含有Master进程的节点,Slave是集群中含有Worker进程的节点。Master做为整个集群的控制器,负责整个集群的正常运行;Worker至关因而计算节点,接收主节点命令与进行状态汇报;Executor负责任务的执行;Cluster做为用户的客户端负责提交应用,Driver负责控制一个应用的执行。
Spark集群部署后,须要在主节点和从节点分别启动Master进程和Woker进程,对整个集群进行控制。在一个Spark应用的执行过程当中,Driver和Worker是两个重要角色。Driver程序是应用逻辑执行的起点,负责做业的调度,即Task任务的分发,而多个Worker用来管理计算节点和建立Executor并行处理任务。在执行阶段,Driver会将Task和Task所依赖的
file和jar序列化后传递给对应的Worker机器,同时Exucutor对相应数据分区的任务进行处理。
下面详细介绍Spark的架构中的基本组件。

  • ClusterManager:在Standalone模式中即为Master(主节点),控制整个集群,监控Worker。在YARN模式中为资源管理器。

  • Worker:从节点,负责控制计算节点,启动Executor或Driver。在YARN模式中为NodeManager,负责计算节点的控制。
    Spark总体流程为:Client提交应用,Master找到一个Worker启动Driver,Driver向Master或者资源管理器申请资源,以后将应用转化为RDD Graph,再由DAGScheduler将RDD Graph转化为Stage的有向无环图提交给TaskScheduler,由TaskScheduler提交任务给Executor执行。在任务执行过程当中,其余组件协同工做,确保整个应用顺利进行。

计算模型

  • Application:应用。能够认为是屡次批量计算组合起来的过程,在物理上能够表现为你写的程序包+部署配置。应用的概念相似于计算机中的程序,它只是一个蓝本,尚没有运行起来。

  • RDD:Resilient Distributed Datasets,弹性分布式数据集。RDD便是计算模型里的一个概念,也是你编程时用到的一种类。一个RDD能够认为是spark在执行分布式计算时的 一批相同来源、相同结构、相同用途的数据集,这个数据集可能被切割成多个分区,分布在不一样的机器上,不管如何,这个数据集被称为一个RDD。在编程 时,RDD对象就对应了这个数据集,而且RDD对象被看成一个数据操做的基本单位。好比,对某个RDD对象进行map操做,其实就至关于将数据集中的每一个 分区的每一条数据进行了map映射。

  • Partition:分区。一个RDD在物理上被切割成多个数据子集,分布在不一样的机器上。每一个数据子集叫一个分区。

  • RDD Graph:RDD组成的DAG(有向无环图)。RDD是不可变的,一个RDD通过某种操做后,会生成一个新的RDD。这样说来,一个 Application中的程序,其内容基本上都是对各类RDD的操做,从源RDD,通过各类计算,产生中间RDD,最后生成你想要的RDD并输出。这个 过程当中的各个RDD,会构成一个有向无环图。

  • Lineage:血统。RDD这个概念自己包含了这种信息“由哪一个父类RDD通过哪一种操做获得”。因此某个RDD能够经过不断寻找父类,找到最原始的那个RDD。这条继承路径就认为是RDD的血统。

  • Job:从Application和RDD Graph的概念能够知道,一个应用每每对应了一个RDD Graph。这个应用在准备被spark集群运行前,实际上就是会生成一个或多个RDD Graph结构,而一个RDD Graph,又能够生成一个或多个Job。一个Job能够认为就是会最终输出一个结果RDD(后面会介绍,实际上这是action操做)的一条由RDD组 织而成的计算,在Application生成的RDD Graph上表现为一个子图。Job在spark里应用里也是一个被调度的单位。

  • 宽依赖:RDD生成另外一个RDD时,各个两个父子RDD间分区的对应关系,被叫作RDD间依赖。宽依赖就是子RDD的某个分区,依赖父RDD的所有分区。

  • 窄依赖:窄依赖就是子RDD的某个分区,只依赖常数个父RDD的分区。宽窄依赖的区别以下图所示。

  • Stage:Stage能够理解为完成一个Job的不一样阶段。一个Job被划分为多个Stage,每一个Stage又包含了对多个RDD的多个操做。一个Stage里,通常包含了一个宽依赖操做,或者多个窄依赖操做。
    窄依赖是指前一个rdd计算能出一个惟一的rdd,好比map或者filter等;宽依赖则是指多个rdd生成一个或者多个rdd的操做,好比groupbykey reducebykey等,这种宽依赖一般会进行shuffle。

  • 算子:父子RDD间的某种操做,被叫某种算子。好比下面会介绍的map,filter,groupByKey等。算子可从多个维度分类,以后再介绍。

  • Task:一个分区对应一个Task。实际上一个Task就是在一个Stage范围内,某个Executor所要执行的算子。

  • TaskSet:一个Stage范围内,全部相同的Task被称为一个TaskSet。

  • DAGScheduler:DAGScheduler用于根据RDD DAG切分Stage,并维护各个Stage的前后依赖关系,至关于完成了一个Job内的不一样Stage间的调度策略。

  • TasksetManager:管理一个TaskSet,并决定了这个TaskSet中各个Task的分发策略。

  • TaskScheduler:执行实际的Task分发操做。

SparkUI、History Server:

SparkUI: 4044
History Server:18080
怎么看?http://www.cnblogs.com/xing90...

参考

http://blog.csdn.net/qq_26562...
http://blog.csdn.net/suzyu123...
http://www.cnblogs.com/helloc...
http://blog.csdn.net/suzyu123...
http://www.jianshu.com/nb/340...
http://www.cnblogs.com/ainima...
http://www.chinahadoop.cn/gro...
https://yq.aliyun.com/article...
http://ifeve.com/category/spa...

相关文章
相关标签/搜索