Spark实战(5)_Spark Core核心编程

Spark版本

cdh5.9.0集成的spark的版本1.6.0,集成的hadoop版本2.6.0。查看的网址:javascript

http://archive.cloudera.com/cdh5/redhat/6/x86_64/cdh/5.9.0/css

若是用cdh5.9.0 parcels离线安装自带的spark(on yarn),启动时提示缺乏包,须要修改spark-env.sh的配置SPARK_DIST_CLASSPATH,里面默认的配置为在线用rpm方式安装的配置,修改成/opt/clouderra/parcels/CDH/lib。java

Spark运行模式

Spark 的执行模式有 local、Yarn、Standalone、Mesos四类。node

开发和测试用 local 模式,其实就是用多线程模似分布式执行。nginx

若是业务部门较少且不须要对部门或组之间的资源作划分和优先级调度的话,可使用 Standalone 模式来部署。sql

当若是有多个部门或组,且但愿每一个组织能够限制固定运行的最大资源,另外组或者任务须要有优先级执行的话,能够选择 Yarn 或 Mesos。apache

Standalone模式,即独立模式,master/slave(worker),自带完整的服务,可单独部署到一个集群中,无需依赖任何其余资源管理系统。须要启动Master和Worker守护进程,即服务端进程,就比如Mapreduce的JobTracker和TaskTracker。缓存

Spark on Yarn: 把Spark做业的调度和资源分配交给Yarn,Yarn至关于Spark集群的Master,Spark无本身的守护进程,仅仅做为客户端存在。微信

MR(Hive)、Storm、Tez、Spark,指望这些做业有统一的调度和资源分配的角色,Yarn(MR2)。markdown

Spark组件

Cluster Manager:在Standalone模式中即为Master(主节点),控制整个集群,监控Worker。在YARN模式中为资源管理器。

Worker:从节点,负责控制计算节点,启动Executor或Driver。在YARN模式中为NodeManager,负责计算节点的控制。

Driver:运行Application的main()函数并建立SparkContext。

Executor:执行器,在worker node上执行任务的组件、用于启动线程池运行任务。

SparkContext:整个应用的上下文,控制应用的生命周期,提交做业的入口。

RDD:Spark的基本计算单元,一组RDD可造成执行的有向无环图RDD Graph。

DAG Scheduler:实现将Spark做业分解成一到多个Stage,每一个Stage根据RDD的Partition个数决定Task的个数,而后生成相应的Task set放到TaskScheduler(NodeManager)中。

TaskScheduler:将任务(Task)分发给Executor执行。

Stage:一个Spark做业通常包含一到多个Stage。

Task:一个Stage包含一到多个Task,经过多个Task实现并行运行的功能。

Spark集群部署

配置免密钥登陆

CDH中用CM进行集群管理,集群直接互联是经过ssh协议,但咱们不须要配置ssh免密匙访问,由于CM中配置了经过相同账户密码访问。用Apache Spark的话,必须配置ssh免密匙访问。

安装Scala和Spark

修改环境变量

1vi /etc/profile
2export HADOOP_HOME=/opt/cloudera/parcels/CDH/lib/hadoop
3export SPARK_HOME=/opt/soft/spark2.0/spark-2.2.0-bin-hadoop2.6
4export SCALA_HOME=/opt/soft/spark2.0/scala-2.11.8
5export JAVA_HOME=/opt/soft/jdk1.8.0_131
6export HADOOP_CONF_DIR=/etc/hadoop/conf

修改$SPARK_HOME/conf
mv slaves.template slaves,slaves里配置工做节点主机名列表。

mv spark-env.sh.template spark-env.sh,spark-env.sh配置一些环境变量,因为咱们用Yarn模式,这里面不用配置。若是是standalone模式呢?

分布式模式运行测试,

1spark-submit --class org.apache.spark.examples.SparkPi \
2--master yarn \
3--num-executors 1 \
4--driver-memory 1g \
5--executor-memory 1g \
6--executor-cores 1 \
7--conf "spark.app.name=SparkPi" \
8/export/servers/spark-2.0.2-bin-hadoop2.6/examples/jars/spark-examples_2.11-2.0.2.jar

报错信息,设置HADOOP_CONF_DIR环境变量便可。

1Exception in thread "main" java.lang.Exception: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
2        at org.apache.spark.deploy.SparkSubmitArguments.validateSubmitArguments(SparkSubmitArguments.scala:256)
3        at org.apache.spark.deploy.SparkSubmitArguments.validateArguments(SparkSubmitArguments.scala:233)
4        at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:110)
5        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:117)
6        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

若是内存不足,报错的话,在cm里进行yarn的配置,以下2个设置为2g:
yarn.scheduler.maximum-allocation-mbyarn.nodemanager.resource.memory-mb
保存后,部署客户端配置,把cm界面里修改过的参数同步到每一个节点的xml配置文件里,重启Yarn服务。

RDD

Spark 对数据的核心抽象——弹性分布式数据集(Resilient Distributed Dataset,简称RDD)。在Spark 中,对数据的全部操做不外乎建立RDD和操做RDD 。

对数据(Seq)的操做,好比List:转换(Transform),map、filter,返回List类型,数据转换/加工过程。Action:head、tail、count ,返回不一样类型,即咱们须要的结果。

RDD就是相似集合类(Iterable),具备和集合类几乎彻底相同的操做(Transform和Action)。而在这一切背后,Spark 会自动将RDD 中的数据分发到集群上,并将操做并行化执行。

Spark中的RDD就是一个不可变的分布式对象集合。每一个RDD 都被分为多个分区,这些分区运行在集群中的不一样节点上。RDD 能够包含Python、Java、Scala 中任意类型的对象,甚至能够包含用户自定义的对象。

RDD是相似Iterable的数据结构。

idea中进行spark core开发,须要在工程中新建一个lib目录,把spark的包复制进去,在Project Structure中的Libraries中把包加进来。

建立RDD

用户可使用两种方法建立RDD:

  • 用SparkContext的parallelize(Seq)把Seq转为RDD。该方式经常使用于学习和实验。
  • 读外部数据,一般是读HDFS、消息队列等。
1def parallelize[T](seq: Seq[T], numSlices: Int = defaultParallelism) : RDD[T]


numSlices是并行度,具备初始值因此调用时能够只给一个参数。好比能够parallelize(seq),能够parallelize(seq,10),并行度为10意味着Spark把数据分割为10份,放在集群上运行。defaultParallelism是机器CPU个数。

1# 查看CPU的个数
2cat /proc/cpuinfo| grep "processor"| wc -l

RDD的操做

RDD有两种类型的操做:Transform操做和Action操做。注:就是Iterable类中的函数,Transform返回Iterable自己类型,Action返回新类型。

Iterable: Seq、Map

对应到

RDD:单元素RDD、PairRDD

Transform操做会由一个RDD生成一个新的RDD,这个过程当中不进行实质计算,只有当第一次Action操做时才会真正计算。称做Lazy计算,惰性计算。

Action操做会对RDD计算出一个结果,能够把结果返回,或把结果存储到外部存储系统(如HDFS)中。

RDD是相似Iterable的数据结构,也具备Iterable类的Map()、filter()、flatMap()等高阶函数。

Action操做

collect():把数据返回驱动器程序中最简单、最多见的操做, 一般在单元测试中使用,数据量不能太大,由于放在内存中,数据量大会内存溢出。

reduce():相似sum(),如:val sum = rdd.reduce((x, y) => x + y),结果同sum。

fold():和reduce()相似,多一个“初始值”,当初始值为零时效果同reduce()。fold(0) = reduce()

take(n):返回RDD中的n个元素,而且尝试只访问尽可能少的分区。

top(n):从RDD中获取前几个元素。

count():用来返回元素的个数。

countByValue():返回一个从各值到值对应的计数的映射表。

sum():返回汇总。

fold(n)的执行原理:每一个分区里进行这样的计算:初始值+sum(元素),最后进行:初始值+sum(分区sum值),初始值累加次数为分区数+1次。

1// 集群模式执行的话,一般用数据以前须要调rdd.collect()
2rdd.collect().foreach(println)
3// 集群模式用它可能拿不全
4rdd.foreach(println)

持久化函数persist()

Spark提供rdd的persist()函数来解决这个重复计算的问题,persist()把须要重复使用的rdd存起来,这样仅第一个Action操做才会计算,其余Action操做不须要再计算。

当咱们执行rdd的persist()时,计算出RDD的节点会分别保存它们所求出的分区数据。若是一个有持久化数据的节点发生故障,Spark 会在须要用到缓存的数据时重算丢失的数据分区。

rdd的persist()有5种持久化级别,分别是:来自org.apache.spark.storage.StorageLevel的定义。

级别 使用的空间 CPU时间 是否在内存中 是否在磁盘上 备注
MEMORY_ONLY  
MEMORY_ONLY_SER  
MEMORY_AND_DISK 中等 部分 部分 若是数据在内存中放不下,则溢写到磁盘上
MEMORY_AND_DISK_SER 部分 部分 若是数据在内存中放不下,则溢写到磁盘上,在内存中存放序列化后的数据
DISK_ONLY  
1val rdd1 = rdd.map(x => x+1)
2rdd1.persist(StorageLevel.DISK_ONLY)
3println(rdd1.first())
4println(rdd1.count())
5println(rdd1.sum())
6println(rdd1.collect().mkString(","))
7rdd1.unpersist()  //释放缓存,必须手工释放

若是以为数据过于重要,怕存一份有风险,则能够存2份:

1rdd1.persist(StorageLevel.MEMORY_ONLY_2)

注意

若是要缓存的数据太多,内存中放不下,Spark会自动利用最近最少使用(LRU)的缓存策略把最老的分区从内存中移除。可是对于仅把数据存放在内存中的缓存级别,下一次要用到已经被移除的分区时,这些分区就须要从新计算。没必要担忧你的做业由于缓存了太多数据而被打断。

若是MEMORY_ONLY内存不足的时候,Spark会自动用硬盘来承载。

WordCount案例

 1package com.padluo.spark.core
 2import org.apache.spark.{SparkConf, SparkContext}
 3object WordCount {
 4  def main(args: Array[String]): Unit = {
 5    val conf = new SparkConf()
 6      .setMaster("local")
 7      .setAppName("WordCount")
 8      .set("spark.testing.memory", "2147480000")
 9    val sc = new SparkContext(conf)
10    //    val lines = sc.textFile("hdfs://spark01:9000/user/spark/spark.txt", minPartitions = 1)
11    //    val lines = sc.textFile("/home/hadoop/spark.txt", minPartitions = 1)
12    //    val lines = sc.textFile("C:/Users/Administrator/Desktop/spark.txt", minPartitions = 1)
13    val lines = sc.textFile("file:///D:/Java/idea/IdeaProjects/spark-study/spark-core/resources/spark.txt", minPartitions = 1)
14    val words = lines.flatMap(_.split(" "))
15    val pairs = words.map((_, 1))
16    val wordCount = pairs.reduceByKey(_ + _)
17    wordCount.foreach(wordCount => println(wordCount._1 + " appeared " + wordCount._2 + " times."))
18    println(lines.count())
19    println(lines.first())
20  }
21}

List方式建立RDD,

从本地文件/HDFS文件读取文件建立RDD,注意本地文件路径和HDFS文件路径的写法。

1val rdd = sc.textFile("file:///c:/spark.txt")
2val rdd = sc.textFile("hdfs://ip:8020/...")

PairRDD

一般多列RDD会转为PairRDD进行操做,这样就能够用PairRDD的Transform和Action操做。

PairRDD的建立

  • 能够经过sc.parallelize建立
  • 程序中其余RDD转的

pairRDD的元素不是Map,而是Tuple2。

PairRDD的操做

reduceByKey(func),合并具备相同键的值。

groupByKey(),对具备相同键的值进行分组。

mapValues(func),对pariRDD中的每一个值应用一个函数而不改变键。mapValues(func)函数,功能相似于map{case (x, y): (x,func(y))}

1// pariRDD使用map和filter,应结合case
2pairRdd.map{
3  case (k, v) => (k, v + 1)
4}
5// key保持不变,value加1
6pairRdd.mapValues(v => v + 1)
7// key进行分组,value两两相加
8pairRdd.reduceByKey(_ + _)
 1package com.padluo.spark.core
 2import org.apache.spark.{SparkConf, SparkContext}
 3object PairRddTest {
 4  def main(args: Array[String]): Unit = {
 5    val conf = new SparkConf()
 6      .setAppName("PairRddTest")
 7      .setMaster("local")
 8      .set("spark.testing.memory", "471859200")
 9    val sc = new SparkContext(conf) //Driver类
10    val rdd = sc.parallelize(List((2, 3), (4, 5), (3, 2), (2, 1)))
11    val rdd2 = sc.parallelize(List(2, 3, 5)).map(i => (i, i + 2)) // 建立pairRdd主要方式
12    rdd2.map {
13      case (k, v) => (k, v + 1)
14    } // i=>i+1 返回单元素,rdd2执行map函数须要返回和rdd2相同(PairRDD)类型
15    rdd2.filter {
16      case (k, v) => v % 2 == 0
17    }
18    // (2,3),(4,5),(3,2),(2,1) -》(2,4),(4,6),(3,3),(2,2)
19    rdd.mapValues(i => i + 1).foreach(println _)
20    rdd.reduceByKey(_ + _).foreach(println _) // (2,3),(4,5),(3,2),(2,1) ->(2,4),(4,5),(3,2)
21    rdd.groupByKey().foreach(println)
22    println(rdd.groupByKey().getClass)
23  }
24}

案例:计算每一个键对应的平均值

 1package com.padluo.spark.core
 2import org.apache.spark.{SparkConf, SparkContext}
 3object AvgTest {
 4  def main(args: Array[String]): Unit = {
 5    val conf = new SparkConf()
 6      .setAppName("PairRddTest")
 7      .setMaster("local")
 8      .set("spark.testing.memory", "471859200")
 9    val sc = new SparkContext(conf) //Driver类
10    val rdd = sc.parallelize(List((3, 4), (2, 4), (3, 2), (2, 6)))
11    rdd.mapValues(i => (i, 1)) // (3, (4, 1)
12      .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
13      .mapValues(i => i._1 / i._2)
14      .foreach(println)
15  }
16}

Join案例

join,对两个RDD进行内链接。

leftOuterJoin,对两个RDD进行左链接。

rightOuterJoin,对两个RDD进行右链接。

效果同sql里,能够两个rdd想象成2张数据表(包含2个字段)。

 1package com.padluo.spark.core
 2import org.apache.spark.{SparkConf, SparkContext}
 3object JoinTest {
 4  def main(args: Array[String]): Unit = {
 5    val conf = new SparkConf()
 6      .setAppName("PairRddTest")
 7      .setMaster("local")
 8      .set("spark.testing.memory", "471859200")
 9    val sc = new SparkContext(conf) //Driver类
10    val table1 = sc.parallelize(List(("k1", 1), ("k2", 2), ("k3", 3))) // 看作两个字段
11    val table2 = sc.parallelize(List(("k1", 10), ("k2", 20), ("k4", 30)))
12    table1.join(table2).foreach(println) //("k1",(1,10)),("k2",(2,20))
13    println("-----")
14    table1.leftOuterJoin(table2).foreach(println)
15    println("-----")
16    table1.rightOuterJoin(table2).foreach(println)
17  }
18}

电商流量统计案例实战

注意点,

  • cache()和persist()的区别。
  • 分区key的设置后后面RDD操做key的对应关系。
  • 结果存储到HDFS。
  • 缓存的释放。
 1package com.padluo.spark.sql
 2import org.apache.spark.storage.StorageLevel
 3import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
 4object VisitCount {
 5  def main(args: Array[String]): Unit = {
 6    val conf = new SparkConf()
 7      .setAppName("VisitCount")
 8      .setMaster("local")
 9      .set("spark.testing.memory", "471859200")
10    val sc = new SparkContext(conf)
11    val linesRDD = sc.textFile("D:\\Java\\idea\\IdeaProjects\\spark-study\\spark-sql\\src\\main\\datasource\\2015082818")
12      .cache() // cache和persist的区别
13      .filter(line => line.length > 0)
14      .map { line =>
15        val arr = line.split("\t") // 须要转义吗?"\t"仍是"\\t"
16      val date = arr(17).substring(0, 10)
17        val provinceId = arr(23)
18        val url = arr(1)
19        val guid = arr(5)
20        (date + "_" + provinceId, (guid, url))
21      }.filter(line => line._2._2.length > 5) // length(url) > 5
22      // 把数据经过key【date + "_" + provinceId】【date_provinceId_guid】???进行分区
23      // 分区字段要注意,和后面groupByKey/reduceByKey的key对应
24      // 相同key【date + "_" + provinceId】的数据会存储在同一个节点上
25      .partitionBy(new HashPartitioner(10))
26      .persist(StorageLevel.MEMORY_ONLY)
27    linesRDD.map(line => (line._1 + "_" + line._2._1, 1)) // (date_provinceId_guid, 1)
28      .reduceByKey(_ + _) // (date_provinceId_guid, pv)
29      .map { i =>
30      val arr = i._1.split("_")
31      val dateProvinceId = arr(0) + "_" + arr(1)
32      val guidCount = 1
33      val pv = i._2
34      (dateProvinceId, (guidCount, pv))
35    }.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
36      .map { i =>
37        val arr = i._1.split("_")
38        val date = if (arr.length >= 1) arr(0) else "Nil"
39        val provinceId = if (arr.length >= 2) arr(1) else "Nil"
40        val pv = i._2._2
41        val uv = i._2._1
42        (date, provinceId, pv, uv)
43      }.foreach(println)
44    // .saveAsTextFile("hdfs://master:8020/user/root/VisitCount")
45    linesRDD.unpersist() // 释放缓存
46    sc.stop()
47  }
48}

Spark优化

数据分区

在分布式集群里,网络通讯的代价很大,减小网络传输能够极大提高性能。

Mapreduce框架的性能开支主要在哪里?

  • IO:大量读写文件
  • 网络传输:压缩(大文件变小文件从而减小网络传输,可是增长CPU计算负载)。

网络传输主要在shuffle阶段,shuffle的缘由是相同的key存在不一样的节点上,按key进行聚合的时候不得不进行shuffle。

Spark把RDD进行分片(分区),放在集群上并行计算。同一个RDD,分片100个,集群有10个节点,平均一个节点10个分区。

对于sum型的计算:先进行每一个分区的sum,而后把sum值shuffle传输到主程序进行全局sum,因此此时shuffle过程当中只须要传输分区sum,网络开销很小。

但对于Join类的计算,须要把数据自己进行shuffle,网络开销很大。

Spark是如何优化这个问题的?Spark把key-value RDD经过key的hashcode进行分区,且保证相同的key存储在同一个节点上。key的分布不均衡决定了有的分区大,有的分区小。这样对该RDD进行key聚合时,不须要shuffle过程。

Join操做时,一般把用的频繁的大表事先进行分区,如:

 1val linesRDD = sc.textFile("D:\\Java\\idea\\IdeaProjects\\spark-study\\spark-sql\\src\\main\\datasource\\2015082818")
 2      .cache() // cache和persist的区别
 3      .filter(line => line.length > 0)
 4      .map { line =>
 5        val arr = line.split("\t") // 须要转义吗?"\t"仍是"\\t"
 6      val date = arr(17).substring(0, 10)
 7        val provinceId = arr(23)
 8        val url = arr(1)
 9        val guid = arr(5)
10        (date + "_" + provinceId, (guid, url))
11      }.filter(line => line._2._2.length > 5) // length(url) > 5
12      // 把数据经过keydate + "_" + provinceId】【date_provinceId_guid】???进行分区
13      // 分区字段要注意,和后面groupByKey/reduceByKey的key对应
14      // 相同keydate + "_" + provinceId】的数据会存储在同一个节点上
15      .partitionBy(new HashPartitioner(10))
16      .persist(StorageLevel.MEMORY_ONLY)

进行join时,仅须要对另外一个小数据量的表进行shuffle过程。

可以从数据分区中获益的操做有cogroup()、groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combineByKey()以及lookup(),基于key的操做都会获益。

而对于诸如join()这样的二元操做,预先进行数据分区会让其中至少一个RDD(使用已知分区器的那个RDD)不发生数据shuffle。若是两个RDD使用一样的分区方式,而且它们还缓存在一样的机器上(好比一个RDD是经过mapValues()从另外一个RDD中建立出来的,这两个RDD就会拥有相同的键和分区方式),跨节点的数据shuffle就不会发生了。

参数优化

进行spark-submit时,会给每一个做业分配资源。处理的数据量越大,须要分配的资源越多。


本文首发于steem,感谢阅读,转载请注明。

https://steemit.com/@padluo


微信公众号「padluo」,分享数据科学家的自我修养,既然碰见,不如一块儿成长。

数据分析数据分析

读者交流电报群

https://t.me/sspadluo


知识星球交流群

知识星球读者交流群知识星球读者交流群