[spark]spark 编程教程

 

参考:html

英文:https://spark.apache.org/docs/latest/programming-guide.htmljava

中文:http://www.cnblogs.com/lujinhong2/p/4651025.html 1.2.1版本的python

 

(一)快速入门shell

老规矩,先看一个简单示例,有个认识。这个示例来自官方example的SparkPi:express

package org.lujinhong.demo.spark

/*
 * 官方的sparkPi示例
 */

import scala.math.random

import org.apache.spark._

/** Computes an approximation to pi */
object SparkPi {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Spark Pi").setMaster("local")
    val spark = new SparkContext(conf)
    val slices = if (args.length > 0) args(0).toInt else 2
    val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
    val count = spark.parallelize(1 until n, slices).map { i =>
      val x = random * 2 - 1
      val y = random * 2 - 1
      if (x*x + y*y < 1) 1 else 0
    }.reduce(_ + _)
    println("Pi is roughly " + 4.0 * count / n)
    spark.stop()
  }

}
 
注意以上的setMaster(“local”)是本身加上去的,方便直接在本地运行。若是在集群上运行,则经过spark-submit的—master参数指定。
写好代码后,就能够直接在eclipse中右键—>运行了。

 

 (二)理论介绍apache

 

一、spark中的全部操做都与RDD相关,包括建立RDD,transformation(将RDD转换为另外一个RDD)和action(触发RDD的计算,以及输出等)。缓存

In Spark all work is expressed as either creating new RDDs, transforming existing RDDs, or calling operations on RDDs to compute a result.app

 二、RDD是一个不可变的分布式对象集合,每一个RDD会被分红多个分区,它们分别在不一样的机器上被计算。它能够是任何的python/java/scala对象,包括你本身建立的对象。dom

注意RDD是不可变的,所以若须要改变现有RDD的内容,只能经过建立一个新的RDD来实现,这也是transformation的做用。eclipse

RDD是一个集合,所以能够经过一些迭代方法对内容进行处理

 三、RDD操做类型:对RDD的操做能够分为2种类型

(1)Transformation: 将一个RDD转化为另外一个RDD,如map, filter等操做

(2)Action:返回计算结果给driver,写入存储等操做。

最明显的区别:transformation返回一个RDD, action返回其它数据类型

 

(三)Spark应用的主要4个工做流程以下:

一、create:     经过读取外部数据源来建立RDD。(虽说也能够将list/set等转化为RDD,但实际上这对于处理大数据没什么做用,通常只用做demo)

二、transformation:  将RDD将化为另外一个RDD,如filter()等。

三、cache:  将RDD缓存下来,方便以后再使用,如persitst()等。

四、action:   执行真正的工做,计算结果并输出,如count(),first()等。

几个注意点

一、建立RDD有2种方法:

(1)从外部数据集中建立,如从文件,socket,kafka, flume等数据源

(2)将list/set等集合转化为RDD。scala> val lines = sc.parallelize(List("apple","pear"));

二、执行transformation只定义了操做,spark执行的是懒计算原则,即transformation不会触发真正的计算,而是等到第一个action出现时才开始真正的计算。这对于大数据量时成为重要。如读取一份大文件时,若立刻将其读入内存,会占用大量的内存空间,而有可能过很长时间也会开始计算。另外一方面,若是只是执行first()相似的计算,这个文件彻底不必所有读入内存,而是只读取到第一行就能够了。

三、默认状况下,对于每个action,spark会从新计算它用到的RDD,若一个RDD会被以后的多个action用到,能够将其缓存到内存(看成也能够到磁盘等),如读取一个文件后,先通过filter,过滤出只包括”spark”的行,此时能够将这个RDD保存到内存中,再分别计算它的count(),first()等操做。

cache() is the same as calling persist() with the default storage level.

四、action会触发真正的计算。

看一个示例:

$ bin/spark-shell

 
(1)建立RDD
scala> val fileContent = sc.textFile("file:///home/hadoop/spark/README.md”)
 
(2)过滤RDD
scala> val pythonLine = fileContent.filter(line => line.contains("spark”))
 
(3)计算行数
scala> pythonLine.count
最后的输出以下:
15/07/21 11:20:43 INFO scheduler.DAGScheduler: Job 2 finished: count at <console>:26, took 1.495956 s
res5: Long = 11
 
能够看出使用了1.5秒左右。
 
(4)咱们试一下缓存后再计算
scala> pythonLine.cache()
scala> pythonLine.count
15/07/21 11:22:18 INFO scheduler.DAGScheduler: Job 3 finished: count at <console>:26, took 0.123537 s
res7: Long = 11
只使用了0.12秒
 
(5)继续执行其它action
scala> pythonLine.first()
。。。。。
相关文章
相关标签/搜索