Spark编程指南

一、在maven里面添加引用,spark和hdfs的客户端的。html

groupId = org.apache.spark
artifactId = spark-core_2.9.3
version = 0.8.1-incubating 
groupId = org.apache.hadoop
artifactId = hadoop-client
version = <your-hdfs-version>

二、把assembly/target/spark-assembly_2.9.3-0.8.1-incubating.jar添加到classpath里面,而后咱们在程序里面要添加如下引用。程序员

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

三、下面是官方的WorkCount的例子,能够参考一下。算法

/*** SimpleApp.scala ***/
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._

object SimpleApp {
  def main(args: Array[String]) {
    val logFile = "$YOUR_SPARK_HOME/README.md" // Should be some file on your system
    val sc = new SparkContext("local", "Simple App", "YOUR_SPARK_HOME", List("target/scala-2.9.3/simple-project_2.9.3-1.0.jar"))
    val logData = sc.textFile(logFile, 2).cache()
    val numAs = logData.filter(line => line.contains("a")).count()
    val numBs = logData.filter(line => line.contains("b")).count()
    println("Lines with a: %s, Lines with b: %s".format(numAs, numBs))
 } 
}
val sc = new SparkContext("local", "Simple App", "YOUR_SPARK_HOME", List("target/scala-2.9.3/simple-project_2.9.3-1.0.jar"))
SparkContext是SparkContext的上下文对象,是很是核心的一个类,它的实例化方法是Hadoop datasets。
(1)Parallelized collectionsnew SparkContext(master, appName, [sparkHome], [jars])。
master:master的地址。
appName:应用的名称。
sparkHome:spark的安装地址。
jars:jar包的位置。

四、Spark老是围绕这个一个概念来进行 resilient distributed dataset (RDD),是能够并行操做的支持容错的元素集合。目前支持两种类型的RDDs,parallelized collections和是scala中存在的集合类,而且支持并行操做。
scala> val data = Array(1, 2, 3, 4, 5)
data: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val distData = sc.parallelize(data)
distData: spark.RDD[Int] = spark.ParallelCollection@10d13e3e

 正常状况之下,spark会自动设置并行任务所须要的cpu的分片,通常是每一个cpu 2-4个分片,也能够本身手动设置,sc.parallelize(data, 10)。apache

 (2)Spark支持hadoop上的任何数据集,好比text files, SequenceFiles,还有其它的InputFormat。api

   下面是text files的例子:数组

scala> val distFile = sc.textFile("data.txt")
distFile: spark.RDD[String] = spark.HadoopRDD@1d4cee08

   SequenceFiles则使用SparkContext’s sequenceFile[K, V] ,好比sequenceFile[Int, String],Int对应的是IntWritable,String对应的是Text。缓存

   别的数据格式使用SparkContext.hadoopRDD,以后再介绍,这个文档没有介绍。app

   正常状况之下,spark是一个block一个任务。maven

   (3)RDDs只支持两种操做: transformations,  从一个数据集转换成另一种; actions, 经过对一个数据集进行运算以后返回一个值。oop

     Spark当中全部的transformations都是延迟执行的,等到真正使用的时候才会进行运算。

     默认的,每个通过transformed的RDD当有action做用于它的时候,它会从新计算一遍,除非咱们进行persist (or cache) 操做。

     最后附录一下RDD的API地址:http://spark.incubator.apache.org/docs/latest/api/core/index.html#org.apache.spark.rdd.RDD

 (4)RDD Persistence 

    Spark最重要的一个功能就是能够把RDD持久化或者缓存,当你进行一个持久化操做的时候,Spark会在全部节点的内存当中保存这个RDD,第一个的时候计算,以后一直使用不须要再从新计算了。缓存是实现迭代式算法的关键。咱们可使用persist() or cache()方法来持久化一个RDD,它是容错的,当这个RDD的任何分片丢失以后,它会在以前计算它的机器上从新计算。另外每个RDD,有它本身的存储Level,存储在硬盘或者存储在内存,可是序列化成Java对象(节省空间),或者在集群间复制。要设置它,咱们须要传递一个StorageLevel给persist(),cache()是默认的了是StorageLevel.MEMORY_ONLY (存储为反序列化对象在内存当中)

  当内存足够的时候,咱们可使用MEMORY_ONLY;当内存不太好的时候,咱们能够采用MEMORY_ONLY_SER,在内存中存储为一个字节数组,速度还能够;当操做的数据集合足够大的时候,咱们就把中间结果写到硬盘上;若是要支持容错,就使用备份到2个节点上的方式。若是要本身定义一个的话,要使用StorageLevel的apply()方法。      

 五、共享变量

   Spark提供了两种限制的共享变量,BroadcastAccumulators。

   (1)Broadcast容许程序员持有一个只读的变量在各个节点之间,它一个经常使用的场景就是用它来存储一个很大的输入的数据集给每一个节点使用,Spark会只用它独有的广播算法来减小通讯损失。下面是例子:

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: spark.Broadcast[Array[Int]] = spark.Broadcast(b5c40191-a864-4c7d-b9bf-d87e1a4e787c)

scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

 

    (2)Accumulators是用来计数或者求总数的,使用SparkContext.accumulator(v)来给它一个初始化的值,而后用“+=”来进行操做,可是任务之间不能获得它的结果,只有驱动任务的程序能够获得它的结果。下面是例子:

scala> val accum = sc.accumulator(0)
accum: spark.Accumulator[Int] = 0

scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
...
10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s

scala> accum.value
res2: Int = 10
相关文章
相关标签/搜索