参考来源:http://www.yiibai.com/spark/node
概述
Apache Spark是一个集群计算设计的快速计算。它是创建在Hadoop MapReduce之上,它扩展了 MapReduce 模式,有效地使用更多类型的计算,其中包括交互式查询和流处理。Spark的主要特征是其内存集群计算,增长的应用程序的处理速度。shell
三种部署方法:apache
Spark RDD
弹性分布式数据集(RDD)是Spark的基本数据结构。它是对象的不可变的分布式集合。在RDD中每一个数据集被划分红逻辑分区,这多是在群集中的不一样节点上计算的。RDDS能够包含任何类型,如:Python,Java,或者Scala的对象,包括用户定义的类。编程
安装
按顺序安装Java、Scala、Spark数组
Spark核心编程
建立简单RDD
Spark容器会自动建立Spark 上下文对象名为sc缓存
$ spark-shell scala> val inputfile = sc.textFile(“input.txt”)
RDD转换
S.No | 转换&含义
--------|----------------
1 | map(func) 返回一个新的分布式数据集,传递源的每一个元素造成经过一个函数 func
2 | filter(func) 返回由选择在func返回true,源元素组成了一个新的数据集
3 | flatMap(func) 相似映射,但每一个输入项目能够被映射到0以上输出项目(因此func应返回seq而不是单一的项目)
4 | mapPartitions(func) 相似映射,只不过是单独的每一个分区(块)上运行RDD,所以 func 的类型必须是Iterator
5 | mapPartitionsWithIndex(func) 相似映射分区,并且还提供func 来表示分区的索引的整数值,所以 func 必须是类型 (Int, Iterator
6 | sample(withReplacement, fraction, seed) 采样数据的一小部分,有或没有更换,利用给定的随机数发生器的种子
7 | union(otherDataset) 返回一个新的数据集,其中包含源数据和参数元素的结合
8 | intersection(otherDataset) 返回包含在源数据和参数元素的新RDD交集
9 | distinct([numTasks]) 返回一个新的数据集包含源数据集的不一样元素
10 | groupByKey([numTasks]) 当调用(K,V)数据集,返回(K, Iterable
11 | reduceByKey(func, [numTasks])
12 | aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
13 | sortByKey([ascending], [numTasks])
14 | join(otherDataset, [numTasks])
15 | cogroup(otherDataset, [numTasks])
16 | cartesian(otherDataset) 当上调用类型T和U的数据集,返回(T,U)对数据集(全部元素对)
17 | pipe(command, [envVars]) RDD经过shell命令每一个分区,例如:一个Perl或bash脚本。RDD元素被写入到进程的标准输入和线路输出,标准输出形式返回一个字符串RDD
18 | coalesce(numPartitions) 减小RDD到numPartitions分区的数量。过滤大型数据集后,更高效地运行的操做
19 | repartition(numPartitions) 打乱RDD数据随机创造更多或更少的分区,并在它们之间平衡。这老是打乱的全部数据在网络上
20 | repartitionAndSortWithinPartitions(partitioner) 根据给定的分区从新分区RDD及在每一个结果分区,排序键记录。这是调用从新分配排序在每一个分区内,由于它能够推进分拣向下进入混洗机制效率更高。
动做
S.No | 操做 & 含义
--------|---------------------
1 | reduce(func) 合计数据集的元素,使用函数 func (其中有两个参数和返回一行). 该函数应该是可交换和可结合,以便它能够正确地在并行计算。
2 | collect() 返回数据集的全部做为数组在驱动程序的元素。这是一个过滤器或其它操做以后返回数据的一个足够小的子集,一般是有用的
3 | count() 返回该数据集的元素数
4 | first() 返回的数据集的第一个元素(相似于使用(1))
5 | take(n) 返回与该数据集的前n个元素的阵列。
6 | takeSample (withReplacement,num, [seed]) 返回数组的数据集num个元素,有或没有更换随机抽样,预指定的随机数发生器的种子可选
7 | takeOrdered(n, [ordering]) 返回RDD使用或者按其天然顺序或自定义比较的前第n个元素
8 | saveAsTextFile(path) 写入数据集是一个文本文件中的元素(或一组文本文件),在给定的目录的本地文件系统,HDFS或任何其余的Hadoop支持的文件系统。Spark调用每一个元素的 toString,将其转换为文件中的文本行
9 | saveAsSequenceFile(path) (Java and Scala) 写入数据集,为Hadoop SequenceFile元素在给定的路径写入在本地文件系统,HDFS或任何其余Hadoop支持的文件系统。 这是适用于实现Hadoop可写接口RDDS的键 - 值对。在Scala中,它也能够在属于隐式转换为可写(Spark包括转换为基本类型,如 Int, Double, String 等等)类型。
10 | saveAsObjectFile(path) (Java and Scala) 写入数据集的内容使用Java序列化为一个简单的格式,而后可使用SparkContext.objectFile()加载。
11 | countByKey() 仅适用于RDDS的类型 (K, V). 返回(K, Int)对与每一个键的次数的一个HashMap。
12 | foreach(func) 数据集的每一个元素上运行函数func。这一般对于不良反应,例如更新累加器或与外部存储系统进行交互进行。网络
示例程序数据结构
//打开Spark-Shell $ spark-shell //建立一个RDD scala> val inputfile = sc.textFile("input.txt") //执行字数转换 scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_); //当前RDD scala> counts.toDebugString //缓存转换 scala> counts.cache() //应用动做 scala> counts.saveAsTextFile("output")
Spark部署
Spark应用程序使用spark-submit(shell命令)来部署在集群中的Spark应用程序
示例:
SparkWordCount.scalaapp
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark._ object SparkWordCount { def main(args: Array[String]) { val sc = new SparkContext( "local", "Word Count", "/usr/local/spark", Nil, Map(), Map()) /* local = master URL; Word Count = application name; */ /* /usr/local/spark = Spark Home; Nil = jars; Map = environment */ /* Map = variables to work nodes */ /*creating an inputRDD to read text file (in.txt) through Spark context*/ val input = sc.textFile("in.txt") /* Transform the inputRDD into countRDD */ valcount = input.flatMap(line ⇒ line.split(" ")) .map(word ⇒ (word, 1)) .reduceByKey(_ + _) /* saveAsTextFile method is an action that effects on the RDD */ count.saveAsTextFile("outfile") System.out.println("OK"); } }
步骤:
一、下载Spark Ja
下载spark-core_2.10-1.3.0.jar
二、编译程序
$ scalac -classpath "spark-core_2.10-1.3.0.jar:/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar" SparkPi.scala
三、建立JAR
jar -cvf wordcount.jar SparkWordCount*.class spark-core_2.10-1.3.0.jar/usr/local/spark/lib/spark-assembly-1.4.0-hadoop2.6.0.jar
四、提交spark应用
spark-submit --class SparkWordCount --master local wordcount.jar