1、Spark Shell on Client
scala> var rdd =sc.parallelize(1 to 100 ,3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.count
res0: Long = 100
scala> val rdd2=rdd.map(_ + 1)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:26
scala> rdd2.take(3)
res1: Array[Int] = Array(2, 3, 4)
scala> val rdd1=sc.textFile("file://home/hadoop/apps/sparkwc" )
rdd1: org.apache.spark.rdd.RDD[String] = file://home/hadoop/apps/sparkwc MapPartitionsRDD[3] at textFile at <console>:24
cala> val rdd1=sc.textFile("file:///home/hadoop/apps/sparkwc" )
rdd1: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/apps/sparkwc MapPartitionsRDD[9] at textFile at <console>:24
scala> val rdd2=rdd
rdd rdd1 rdd2 rdd3 rddToDatasetHolder
scala> val rdd2=rdd1.flatMap(_.split("\t" ))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[10] at flatMap at <console>:26
scala> val rdd3=rdd2.map((_,1))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[11] at map at <console>:28
scala> val rdd4=rdd3.reduceByKey(_ + _)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[12] at reduceByKey at <console>:30
scala> rdd4.collect
res2: Array[(String, Int)] = Array((spark,1), (hadoop,1), (hello,3), (world,1))
scala> rdd4.collect
res2: Array[(String, Int)] = Array((spark,1), (hadoop,1), (hello,3), (world,1))
scala> rdd4.saveAsTextFile("file:///home/hadoop/apps/out1" )
[hadoop@hadoop01 apps]$ cd out1/
[hadoop@hadoop01 out1]$ ls
part-00000 _SUCCESS
[hadoop@hadoop01 out1]$ cat part-00000
(spark,1)
(hadoop,1)
(hello,3)
(world,1)
[hadoop@hadoop01 out1]$ pwd
/home/hadoop/apps/out1
复制代码
WebUI 地址:http://192.168.43.20:4040/jobs/ node
2、Spark Shuffle
Shuffle Write:将Task中间结果数据写入到本地磁盘
Shuffle Read:从Shuffle Write阶段拉取数据到内存中并行计算
3、Shuffle Write(hash-based)
Shuffle Write阶段产生的总文件数=MapTaskNum * ReduceTaskNum
TotalBufferSize=CoreNum * ReducceTaskNum*FileBufferSize
产生大量小文件,占用更多的内存缓冲区,形成没必要要的内存开销,增长 了磁盘IO和网络开销
4、Shuffle Write(hash-based优化)
Shuffle Write阶段产生的总文件数=CoreNum * ReduceTaskNum
TotalBufferSize=CoreNum * ReducceTaskNum*FileBufferSize 减小了小文件产生的个数,可是占用内存缓冲区的大小没变
设置方法
conf.set("spark.shuffle.manager", "hash")
在conf/spark-default.conf 配置文件中添加spark.shuffle.manager=hash
5、Shuffle Write(hash-based优化)Shuffle Write(sort-based)
Shuffle Write阶段产生的总文件数= MapTaskNum * 2
优势: 顺序读写可以大幅提升磁盘IO性能,不会产生过多小文件,下降文件缓存占用内存空间大小,提升内存使用率。
缺点:多了一次粗粒度的排序。
设置方法
代码中设置:conf.set("spark.shuffle.manager", "sort")
在conf/spark-default.conf 配置文件中添加spark.shuffle.manager=sort
6、Shuffle Read
hase-based和sort-based使用相同的shuffle read实现
7、Spark History Server配置
spark history server查看运行完成的做业信息和日志
配置Hadoop的yarn-site.xml文件,全部节点配置文件同步,重启yarn
<property>
<name>yarn.log.server.url</name>
<value>http://node02:19888/jobhistory/logs</value>
<description> Yarn JobHistoryServer访问地址 </description>
</property>
复制代码
修改spark安装包conf目录下的spark-defaults.conf(若是没有该文件, 经过spark-defaults.conf.template模板复制一个),spark history server 在192.168.183.100节点启动,spark_logs这个目录须要在HDFS上提早建立
spark.yarn.historyServer.address=192.168.183.100:18080 spark.history.ui.port=18080
spark.eventLog.enabled=true spark.eventLog.dir=hdfs:///spark_logs
spark.history.fs.logDirectory=hdfs:///spark_logs
复制代码
1.Spark History Server启动
sbin/start-history-server.sh
复制代码
httpL://192.168.183.100:18080
复制代码
7、Spark运行环境优化
将spark系统jar包上传到HDFS上,直接使用HDFS上的文件
在spark安装目录下运行:jar cv0f spark-libs.jar -C jars/ .
将spark安装目录下生成的spark-libs.jar上传到HDFS上的 /system/spark(须要手动建立)目录下
hadoop fs -put spark-libs.jar /system/spark
复制代码
修改spark安装包conf目录下spark-defaults.conf配置文件添加spark- libs.jar在HDFS上的路径数据库
spark.yarn.archive=hdfs:///system/spark/spark-libs.jar
复制代码
8、Spark编程模型
建立SparkContext
建立RDD
在RDD上进行transformation和action
spark提供了丰富的transformation和action算子
返回结果
1.提交Spark程序到Yarn上
2.Spark RDD算子分类
Transformation转换操做,惰性执行,不触发app执行
针对Value数据类型,如map、filter
针对Key-Value数据类型,如groupByKey、reduceByKey
Action执行操做,触发app执行
3.建立RDD
parallelize从集合建立RDD
参数1:Seq集合,必须
参数2:分区数
建立RDD:val rdd = sc. parallelize(List(1,2,3,4,5,6,7),3)
查看RDD分区数:rdd.partitions.size
textFile从外部数据源(本地文件或者HDFS数据集)建立RDD
参数1:外部数据源路径,必须
参数2:最小分区数
从本地文件建立RDD:val rdd = sc.textFile("file:///home/hadoop/apps/in")
从HDFS数据集建立RDD:val rdd = sc.textFile("hdfs:///data/wc/in",1)
4.Value数据类型Transformation
map
输入是一个RDD,将一个RDD中的每一个数据项,经过map中的函数映射输出一个新的RDD,输入分区与输出分区一一对应
flatMap
distinct
coalesce
对RDD进行重分区
第一个参数为重分区的数目
第二个为是否进行shuffle,默认为false,若是重分区以后分区数目大于 原RDD的分区数,则必须设置为true
repartition
对RDD进行重分区, 等价于coalesce第二个参数设置为true
union
mapPartitions
针对RDD的每一个分区进行操做,接收一个可以处理迭代器的函数做为参数
若是RDD处理的过程当中,须要频繁的建立额外对象,使用mapPartitions要比使用map的性能高不少,如:建立数据库链接
mapPartitionsWithIndex
与mapPartitions功能相似,接收一个第一个参数是分区索引,第二个参数是分区迭代器的函数
zip
拉链操做,将两个RDD组合成Key-Value形式的RDD,保证两个RDD的partition数量和元素个数要相同,不然会抛出异常
mapValues
groupByKdy
将RDD[K,V]中每一个K对应的V值,合并到一个集合Iterable[V]中
reduceByKey
将RDD[K,V]中每一个K对应的V值根据传入的映射函数计算
join -返回两个RDD根据K能够关联上的结果,join只能用于两个RDD之间的关联,若是要多个RDD关联,须要关联屡次
5.RDD Action
collect
saveAsTextFile
take
count
6.Spark优化-Cache应用
7.Accumulator计数器
accumulator累加器,计数器
accumulator累加器,计数器
一般用于监控,调试,记录关键数据处理的数目等
分布式计数器,在Driver端汇总
val total_counter = sc.accumulator(0L,"total_counter" )
val resultRdd = rowRdd.flatMap(_.split("\t" )).map(x=>{ total_counter += 1
(x,1)
}).reduceByKey(_ + _)
复制代码
经过Spark Web UI查看 apache