本文主要介绍Spark基本数据结构RDD的原理和使用,以及搭建了基于Docker的Spark集群开发测试环境,最后给出了几个实际程序例子,算是Saprk入了门:)node
RDD是Spark中最核心的概念算法
弹性:RDD 能够在不改变内部存储数据记录的前提下,去调整并行计算计算单元的划分结构,弹性这一特性,也是为并行计算服务的docker
容错性:分布式的通常问题是须要具备容错性,那么RDD自己是具备容错性的,apache
RDD 内部的数据集合在逻辑上和物理上被划分红多个小子集合Partition,这样的每个子集合咱们将其称为分区,分区的个数会决定并行计算的粒度,而每个分区数值的计算都是在一个单独的任务Task中进行,所以并行任务的个数,也是由 RDD分区的个数决定的
Partition -> Task编程
先放个图,看下Spark总体程序是怎么执行的浏览器
整个集群分为 Master 节点和 Worker 节点,至关于 Hadoop 的 Master 和 Slave 节点缓存
Master 节点上常驻 Master 守护进程,负责管理所有的 Worker 节点bash
Worker 节点上常驻 Worker 守护进程,负责与 Master 节点通讯并管理 executors数据结构
Driver 官方解释是 “The process running the main() function of the application and creating the SparkContext”。 Application 就是用户本身写的 Spark 程序(driver program)并发
每一个 Worker 上存在一个或者多个 ExecutorBackend 进程。每一个进程包含一个 Executor 对象,该对象持有一个线程池,每一个线程能够执行一个 task。
var list = List(1, 2, 3) list.foreach(println)
Spark 的RDD,封装了各类相似于Scala集合的算子
map、filter、reduce
等,且都是分布式执行的
sc.parallelize()
建立,第二个参数是Partition数目val slices = 10 //Partition数目,即并行的task数目启动10个map task进行处理 val n = 100000 * slices val count = sc.parallelize(1 to n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 (x*x + y*y < 1) 1 else 0 }.reduce(_ + _)
sc.textFile("tile.txt") //将本地文本文件加载成RDD sc.textFile(“hdfs://nn:9000/path/file”) //hdfs文件或目录
sc.sequenceFile(“file.txt”) //将本地二进制文件加载成RDD
sc.hadoopFile(path, inputFmt, keyClass, valClass)
inputRdd = sc.textFile(“hdfs:///data/input”) inputRdd = sc.textFile(“hdfs://namenode:8020/data/input”)
HDFS的datanode的Block和Spark数据的partiton是一一映射的,也和task一一映射,也就是下图所示的就会启动5个task
import org.apache.spark._ import org.apache.hadoop.hbase.mapreduce.TableInputFormat //建立SparkContext val sparkConf = new SparkConf() val sc = new SparkContext(conf ) // 设置hbase configuration val hbaseConf = HBaseConfiguration.create() hbaseConf.addResource(new Path(“hbase-site.xml")) hbaseConf.set(TableInputFormat.INPUT_TABLE, tableName) //建立hbase RDD val hBaseRDD = sc.newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result]) //获取总行数 al count = hBaseRDD.count()
Transformation:将一个RDD经过一种规则,映射成另外一种RDD; Action: 返回结果或者保存结果,只有action才会触发程序的执行,注意Spark中遇到action的时候计算才会去分布式执行
在 Spark 中,全部的转换(transformations)都是惰性(lazy)的,它们不会立刻计算它们的结果。相反的,它们仅仅记录转换操做是应用到哪些基础数据集(例如一个文件)上的。转换仅仅在这个时候计算:当动做(action) 须要一个结果返回给驱动程序的时候。这个设计可以让 Spark 运行得更加高效。例如,咱们能够实现:经过 map 建立一个新数据集在 reduce 中使用,而且仅仅返回 reduce 的结果给 driver,而不是整个大的映射过的数据集。
//建立RDD val listRdd = sc.parallelize(List(1, 2, 3), 3) // 将RDD传入函数,生成新的RDD val squares = listRdd.map(x => x*x) // {1, 4, 9} // 对RDD中元素进行过滤,生成新的RDD val even = squares.filter(_ % 2 == 0) // {4} // 将一个元素映射成多个,生成新的RDD nums.flatMap(x => 1 to x) // => {1, 1, 2, 1, 2, 3}
//建立新的RDD val nums = sc.parallelize(List(1, 2, 3), 2) // 将RDD保存为本地集合(返回到driver端) nums.collect() // => Array(1, 2, 3) // 返回前K个元素 nums.take(2) // => Array(1, 2) // 计算元素总数 nums.count() // => 3 // 合并集合元素 nums.reduce(_ + _) // => 6 // 将RDD写到HDFS中 nums.saveAsTextFile(“hdfs://nn:8020/output”) nums.saveAsSequenceFile(“hdfs://nn:8020/output”)
val pets = sc.parallelize( List((“cat”, 1), (“dog”, 1), (“cat”, 2))) pets.reduceByKey(_ + _) // => {(cat, 3), (dog, 1)} pets.groupByKey() // => {(cat, Seq(1, 2)), (dog, Seq(1)} pets.sortByKey() // => {(cat, 1), (cat, 2), (dog, 1)}
reduceByKey自动在map端进行本地combine
words.reduceByKey(_ + _, 5)
用户也能够经过修改spark.default.parallelism设置默认并行度 默认并行度为最初的RDD partition数目
留一个思考问题:那么这些操做都是怎么分布式执行的呢?
Sample()
从数据集采样
union()
合并多个RDD
cartesian
求笛卡尔积
共享变量:Accumulators和Broadcast Variables
通常来讲上述的操做都是对数据在远端worker node上拷贝的数据进行操做,对数据的效果并不会回传
Accumulator
(累加器,计数器)
相似于MapReduce中的counter,将数据从一个节点发送到其余各个节点上去;
import SparkContext._ val total_counter = sc.accumulator(0L, "total_counter") val counter0 = sc.accumulator(0L, "counter0") val counter1 = sc.accumulator(0L, "counter1") val count = sc.parallelize(1 to n, slices).map { i => total_counter += 1 val x = random * 2 - 1 val y = random * 2 – 1 if (x*x + y*y < 1) { counter1 += 1 } else { counter0 += 1 } if (x*x + y*y < 1) 1 else 0 }.reduce(_ + _)
广播机制 : 高效分发大对象,好比字典(map),集合(set)等,每一个executor一份, 而不是每一个task一份; 引用自Spark doc里的介绍:
Spark 的 action(动做)操做是经过一系列的 stage(阶段)进行执行的,这些 stage(阶段)是经过分布式的 “shuffle” 操做进行拆分的。Spark 会自动广播出每一个 stage(阶段)内任务所须要的公共数据。这种状况下广播的数据使用序列化的形式进行缓存,并在每一个任务运行前进行反序列化。这也就意味着,只有在跨越多个 stage(阶段)的多个任务会使用相同的数据,或者在使用反序列化形式的数据特别重要的状况下,使用广播变量会有比较好的效果。
包括HttpBroadcast和TorrentBroadcast两种
总结:若是一个变量很是大,每个task计算逻辑都要用到这个变量,则应该将其广播出去,更高效
容许将RDD缓存到内存中或磁盘上,以便于重用,若是想屡次使用某个 RDD,强烈推荐在该 RDD 上调用 persist 方法.
Spark 中一个很重要的能力是将数据 persisting 持久化(或称为 caching 缓存),在多个操做间均可以访问这些持久化的数据。当持久化一个 RDD 时,每一个节点的其它分区均可以使用 RDD 在内存中进行计算,在该数据上的其余 action 操做将直接使用内存中的数据。这样会让之后的 action 操做计算速度加快(一般运行速度会加速 10 倍)。缓存是迭代算法和快速的交互式使用的重要工具。
trade-off: Spark 的存储级别的选择,核心问题是在 memory 内存使用率和 CPU 效率之间进行权衡。
原项目能够直接在本机跑,默认是Spark的单机模式;
> docker pull sequenceiq/spark:1.5.1 > sudo docker run -it sequenceiq/spark:1.5.1 bash
遇到问题:在Docker中启动master的时候,ip是Docker 的地址,个人宿主机访问不到,缘由是启动Docker 的时候没有端口映射,因此从新run一遍镜像:
docker run -p 127.0.0.1:8081:8080 -it sequenceiq/spark:1.5.1 bash
将宿主机的8081端口映射到Docker的8080端口
cd /usr/local/spark cp conf/spark-env.sh.template conf/spark-env.sh vi conf/spark-env.sh
加入两行代码:
./sbin/start-master.sh ./sbin/start-slave.sh 172.17.0.109:7077
而后宿主机浏览器访问http://localhost:8081
就能够访问到Spark UI界面惹!
/** * 并行估算pi * Area1 = x * x , Area2 = Pi * (x / 2) * (x / 2) * Area1 / Area2 = 4 / pi * 4 / pi = x / y => pi = 4 * y / x */ object SparkPi { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Spark Pi").setMaster("local[1]"); val sc = new SparkContext(conf); val slices = if (args.length > 0) args(0).toInt else 2; val areaSqure = 100000 * slices; //并行估算areaCircle的值:也就是撒areaSqure这么多个点,求落在圆内的多少个点,就近似等于圆的面积 val areaCircle = sc.parallelize(1 to areaSqure, slices).map{i => val x = new Random().nextInt() * 2 - 1 val y = new Random().nextInt() * 2 - 1 if (x * x + y * y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * areaCircle / areaSqure) } }
log query
任务:如何统计每一个用户在每台机器(ip)上查询(query)的次数和返回结果累积大小(byte)?
/** * 日志查询任务:统计每一个用户在每台机器(ip)上查询(query)的次数和返回结果累积大小(byte) * 分析:key: 每一个用户在每台机器上的query ,value:次数和结果累积大小(byte) */ object LogQuery { val apacheLogRegex = """^([\d.]+) (\S+) (\S+) \[([\w\d:/]+\s[+\-‐]\d{4})\] "(.+?)" (\d{3}) ([\d\-‐]+) "([^"]+)" "([^"]+)".*""".r def extractKey(line : String): (String, String, String) = { apacheLogRegex.findFirstIn(line) match { case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) => if (user != "\"-‐\"") (ip, user, query) else (null, null, null) case _ => (null, null, null) } } def extractStats(line: String): Stats = { apacheLogRegex.findFirstIn(line) match { case Some(apacheLogRegex(ip, _, user, dateTime, query, status, bytes, referer, ua)) => new Stats(1, bytes.toInt) case _ => new Stats(1, 0) } } class Stats(val count: Int, val numBytes: Int) extends Serializable { def merge(other: Stats) = new Stats(count + other.count, numBytes + other.numBytes) override def toString = "bytes=%s\tn=%s".format(numBytes, count) } def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("LogQuery").setMaster("local[1]") val sc = new SparkContext(conf) val dataset = sc.textFile(args(0)) dataset.map(line => (extractKey(line), extractStats(line))) .reduceByKey((a, b) => a.merge(b)) .collect().foreach { case (user, query) => println("%s\t%s".format(user, query)) } } }