map
, filter
, flatMap
, sample
, groupByKey
, reduceByKey
, union
, join
, cogroup
,mapValues
, sort
,partionBy
等多种操做类型,Spark把这些操做称为Transformations。同时还提供Count
, collect
, reduce
, lookup
, save
等多种actions操做。
RDD的特色:java
RDD的好处python
RDD的存储与分区git
RDD的内部表示
在RDD的内部实现中每一个RDD均可以使用5个方面的特性来表示:github
RDD的存储级别
RDD根据useDisk、useMemory、deserialized、replication四个参数的组合提供了11种存储级别:web
val NONE=newStorageLevel(false,false,false) val DISK_ONLY=newStorageLevel(true,false,false) val DISK_ONLY_2=newStorageLevel(true,false,false,2) val MEMORY_ONLY=newStorageLevel(false,true,true) val MEMORY_ONLY_2=newStorageLevel(false,true,true,2) val MEMORY_ONLY_SER=newStorageLevel(false,true,false) val MEMORY_ONLY_SER_2=newStorageLevel(false,true,false,2) val MEMORY_AND_DISK=newStorageLevel(true,true,true) val MEMORY_AND_DISK_2=newStorageLevel(true,true,true,2) val MEMORY_AND_DISK_SER=newStorageLevel(true,true,false) val MEMORY_AND_DISK_SER_2=newStorageLevel(true,true,false,2)
RDD定义了各类操做,不一样类型的数据由不一样的RDD类抽象表示,不一样的操做也由RDD进行抽实现。算法
下面来看一从Hadoop文件系统生成RDD的方式,如:val file = spark.textFile("hdfs://...")
,file变量就是RDD(实际是HadoopRDD实例),生成的它的核心代码以下:shell
// SparkContext根据文件/目录及可选的分片数建立RDD, 这里咱们能够看到Spark与Hadoop MapReduce很像 // 须要InputFormat, Key、Value的类型,其实Spark使用的Hadoop的InputFormat, Writable类型。 def textFile(path:String,minSplits:Int=defaultMinSplits):RDD[String]={ hadoopFile(path,classOf[TextInputFormat],classOf[LongWritable], classOf[Text],minSplits).map(pair=>pair._2.toString)} // 根据Hadoop配置,及InputFormat等建立HadoopRDD newHadoopRDD(this,conf,inputFormatClass,keyClass,valueClass,minSplits)
对RDD进行计算时,RDD从HDFS读取数据时与Hadoop MapReduce几乎同样的:数据库
// 根据hadoop配置和分片从InputFormat中获取RecordReader进行数据的读取。 reader=fmt.getRecordReader(split.inputSplit.value,conf,Reporter.NULL) val key:K=reader.createKey() val value:V=reader.createValue() //使用Hadoop MapReduce的RecordReader读取数据,每一个Key、Value对以元组返回。 override def getNext()={ try{ finished=!reader.next(key,value) }catch{ caseeof:EOFException=> finished=true } (key,value) }
val sc=newSparkContext(master,"Example",System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR"))) val rdd_A=sc.textFile(hdfs://.....) val rdd_B=rdd_A.flatMap((line=>line.split("\\s+"))).map(word=>(word,1)) val rdd_C=sc.textFile(hdfs://.....) val rdd_D=rdd_C.map(line=>(line.substring(10),1)) val rdd_E=rdd_D.reduceByKey((a,b)=>a+b) val rdd_F=rdd_B.jion(rdd_E) rdd_F.saveAsSequenceFile(hdfs://....)
对于RDD能够有两种计算方式:转换(返回值仍是一个RDD)与操做(返回值不是一个RDD)。
下面使用一个例子来示例说明Transformations与Actions在Spark的使用。编程
Spark对于资源管理与做业调度可使用Standalone(独立模式),Apache Mesos及Hadoop YARN来实现。 Spark on Yarn在Spark0.6时引用,但真正可用是在如今的branch-0.8版本。Spark on Yarn遵循YARN的官方规范实现,得益于Spark天生支持多种Scheduler和Executor的良好设计,对YARN的支持也就很是容易,Spark on Yarn的大体框架图。缓存
让Spark运行于YARN上与Hadoop共用集群资源能够提升资源利用率。
Spark使用Scala开发,默认使用Scala做为编程语言。编写Spark程序比编写Hadoop MapReduce程序要简单的多,SparK提供了Spark-Shell,能够在Spark-Shell测试程序。写SparK程序的通常步骤就是建立或使用(SparkContext)实例,使用SparkContext建立RDD,而后就是对RDD进行操做。如:
1 val sc=newSparkContext(master,appName,[sparkHome],[jars])
2 val textFile=sc.textFile("hdfs://.....")
3 textFile.map(....).filter(.....).....
Spark支持Java编程,但对于使用Java就没有了Spark-Shell这样方便的工具,其它与Scala编程是同样的,由于都是JVM上的语言,Scala与Java能够互操做,Java编程接口其实就是对Scala的封装。如:
JavaSparkContext sc=newJavaSparkContext(...); JavaRDD lines=ctx.textFile("hdfs://..."); JavaRDD words=lines.flatMap( newFlatMapFunction<String,String>(){ publicIterable call(Strings){ returnArrays.asList(s.split(" ")); } } );
如今Spark也提供了Python编程接口,Spark使用py4j来实现python与java的互操做,从而实现使用python编写Spark程序。Spark也一样提供了pyspark,一个Spark的python shell,能够以交互式的方式使用Python编写Spark程序。 如:
1 from pyspark import SparkContext
2 sc=SparkContext("local","Job Name",pyFiles=['MyFile.py','lib.zip','app.egg'])
3 words=sc.textFile("/usr/share/dict/words")
4 words.filter(lambdaw:w.startswith("spar")).take(5)
以Standalone模式运行Spark集群
http://spark-project.org/download/spark-0.7.3-prebuilt-cdh4.tgz
)修改配置(conf/*) slaves: 配置工做节点的主机名 spark-env.sh:配置环境变量。
1 SCALA_HOME=/home/spark/scala-2.9.3 2 JAVA_HOME=/home/spark/jdk1.6.0_45 3 SPARK_MASTER_IP=spark1 4 SPARK_MASTER_PORT=30111 5 SPARK_MASTER_WEBUI_PORT=30118 6 SPARK_WORKER_CORES=2SPARK_WORKER_MEMORY=4g 7 SPARK_WORKER_PORT=30333 8 SPARK_WORKER_WEBUI_PORT=30119 9 SPARK_WORKER_INSTANCES=1
把Hadoop配置copy到conf目录下
在master主机上对其它机器作ssh无密码登陆
把配置好的Spark程序使用scp copy到其它机器
在master启动集群:$SPARK_HOME/start-all.sh
以Yarn模式运行Spark
下载Spark代码:git clonegit://github.com/mesos/spark
切换到branch-0.8
使用sbt编译Spark并
$SPARK_HOME/sbt/sbt >package >assembly
把Hadoop yarn配置copy到conf目录下
运行测试
SPARK_JAR=./core/target/scala-2.9.3/spark-core-assembly-0.8.0-SNAPSHOT.jar\
./run spark.deploy.yarn.Client--jar examples/target/scala-2.9.3/\
--classspark.examples.SparkPi--args yarn-standalone
$SPARK_HOME/spark-shell
进入shell便可,在Spark-shell中SparkContext已经建立好了,实例名为sc能够直接使用,还有一个须要注意的是,在Standalone模式下,Spark默认使用的调度器的FIFO调度器而不是公平调度,而Spark-shell做为一个Spark程序一直运行在Spark上,其它的Spark程序就只能排队等待,也就是说同一时间只能有一个Spark-shell在运行。在Spark-shell上写程序很是简单,就像在Scala Shell上写程序同样。
1 scala>val textFile=sc.textFile("hdfs://hadoop1:2323/user/data")
2 textFile:spark.RDD[String]=spark.MappedRDD@2ee9b6e3
3
4 scala>textFile.count()// Number of items in this RDD
5 res0:Long=21374
6
7 scala>textFile.first()// First item in this RDD
8 res1:String=# Spark
在Spark中Spark程序称为Driver程序,编写Driver程序很简单几乎与在Spark-shell上写程序是同样的,不一样的地方就是SparkContext须要本身建立。如WorkCount程序以下:
1 import spark.SparkContext
2 import SparkContext._
3
4 objectWordCount{
5 def main(args:Array[String]){
6 if(args.length==0){
7 println("usage is org.test.WordCount <master>")
8 }
9 println("the args: ")
10 args.foreach(println)
11
12 val hdfsPath="hdfs://hadoop1:8020"
13
14 // create the SparkContext, args(0)由yarn传入appMaster地址
15 val sc=newSparkContext(args(0),"WrodCount",
16 System.getenv("SPARK_HOME"),Seq(System.getenv("SPARK_TEST_JAR")))
17
18 val textFile=sc.textFile(hdfsPath+args(1))
19
20 val result=textFile.flatMap(line=>line.split("\\s+"))
21 .map(word=>(word,1)).reduceByKey(_+_)
22
23 result.saveAsTextFile(hdfsPath+args(2))
24 }
25 }