Spark 编程入门

一,编程环境

如下为Mac系统上单机版Spark练习编程环境的配置方法。
注意:仅配置练习环境无需安装Hadoop,无需安装Scala。html


1,安装Java8
java

注意避免安装其它版本的jdk,不然会有不兼容问题。python

https://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

2,下载spark并解压
http://spark.apache.org/downloads.html

解压到如下路径:
Users/yourname/ProgramFiles/spark-2.4.3-bin-hadoop2.7

3,配置spark环境
vim ~/.bashrc
插入下面两条语句
web



export SPARK_HOME=/Users/yourname/ProgramFiles/spark-2.4.3-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin



4,配置jupyter支持
若未有安装jupyter能够下载Anaconda安装之。使用toree能够安装jupyter环境下的Apache Toree-Scala内核,以便在jupyter环境下运行Spark。
算法



pip install toree
jupyter toree install --spark_home=Users/yourname/ProgramFiles/spark-2.4.3-bin-hadoop2.7



二,运行Spark
sql


Spark能够经过如下一些方式运行。
shell


1,经过spark-shell进入Spark交互式环境,使用Scala语言。

2,经过spark-submit提交Spark应用程序进行批处理。
这种方式能够提交Scala或Java语言编写的代码编译后生成的jar包,也能够直接提交Python脚本。

3,经过pyspark进入pyspark交互式环境,使用Python语言。
这种方式能够指定jupyter或者ipython为交互环境。

4,经过zepplin notebook交互式执行。
zepplin是jupyter notebook的apache对应产品。

5,安装Apache Toree-Scala内核。
能够在jupyter 中运行spark-shell。apache


使用spark-shell运行时,还能够添加两个经常使用的两个参数。
一个是master指定使用何种分布类型。
第二个是jars指定依赖的jar包。编程



#local本地模式运行,默认使用4个逻辑CPU内核
spark-shell

#local本地模式运行,使用所有内核,添加 code.jar到classpath
spark-shell  --master local[*] --jars code.jar 

#local本地模式运行,使用4个内核
spark-shell  --master local[4]

#standalone模式链接集群,指定url和端口号
spark-shell  --master spark://master:7077

#客户端模式链接YARN集群,Driver运行在本地,方便查看日志,调试时推荐使用。
spark-shell  --master yarn-client

#集群模式链接YARN集群,Driver运行在集群,本地机器计算和通讯压力小,批量任务时推荐使用。
spark-shell  --master yarn-cluster




#提交scala写的任务
./bin/spark-submit --class org.apache.spark.examples.SparkPi \
 --master yarn \
 --deploy-mode cluster \
 --driver-memory 4g \
 --executor-memory 2g \
 --executor-cores 1 \
 --queue thequeue \
 examples/jars/spark-examples*.jar 10




#提交python写的任务
spark-submit --master yarn \
--executor-memory 6G \
--driver-memory 6G \
--deploy-mode cluster \
--num-executors 600 \
--conf spark.yarn.maxAppAttempts=1 \
--executor-cores 1 \
--conf spark.default.parallelism=2000 \
--conf spark.task.maxFailures=10 \
--conf spark.stage.maxConsecutiveAttempts=10 \
test.py


三,建立RDD

建立RDD的基本方式有两种,第一种是使用textFile加载本地或者集群文件系统中的数据。第二种是使用parallelize方法将Driver中的数据结构并行化成RDD。vim


1,textFile




2,parallelize(或makeRDD)


四,经常使用Action操做

Action操做将触发基于RDD依赖关系的计算。


1,collect



2,take



3,takeSample




4,first



5,count



6,reduce


7,foreach



8,coutByKey



9,saveAsFile


五,经常使用Transformation操做

Transformation转换操做具备懒惰执行的特性,它只指定新的RDD和其父RDD的依赖关系,只有当Action操做触发到该依赖的时候,它才被计算。

1,map


2,filter



3,flatMap



4,sample



5,distinct



6,subtract



7,union


8,intersection



9,cartesian



10,sortBy




11,pipe



六,经常使用PairRDD转换操做

PairRDD指的是数据为Tuple2数据类型的RDD,其每一个数据的第一个元素被当作key,第二个元素被当作value。

1,reduceByKey


2,groupByKey


3,sortByKey

4,join

5,leftOuterJoin

6,rightOuterJoin


7,cogroup



8,subtractByKey



9,foldByKey


七,持久化操做

若是一个RDD被多个任务用做中间量,那么对其进行cache,缓存到内存中会对加快计算很是有帮助。

声明对一个RDD进行cache后,该RDD不会被当即缓存,而是等到它第一次由于某个Action操做触发后被计算出来时才进行缓存。

可使用persist明确指定存储级别,经常使用的存储级别是MEMORY_ONLY和MEMORY_AND_DISK。


1,cache




2,persist


八,共享变量

当Spark集群在许多节点上运行一个函数时,默认状况下会把这个函数涉及到的对象在每一个节点生成一个副本。可是,有时候须要在不一样节点或者节点和Driver之间共享变量。



Spark提供两种类型的共享变量,广播变量和累加器。


广播变量是不可变变量,实如今不一样节点不一样任务之间共享数据。广播变量在每一个节点上缓存一个只读的变量,而不是为每一个task生成一个副本,能够减小数据的传输。


累加器主要用于不一样节点和Driver之间共享变量,只能实现计数或者累加功能。累加器的值只有在Driver上是可读的,在节点上只能执行add操做。


1,broadcast



2,Accumulator



九,分区操做

分区操做包括改变分区方式,以及和分区相关的一些转换操做。

1,coalesce

2,repartition


3,partitionBy



4,mapPartitions



5,mapPartitionsWithIndex



6,foreachPartitions



7,aggregate



8,aggregateByKey



本文分享自微信公众号 - Python与算法社区(alg-channel)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。

相关文章
相关标签/搜索