1 节点说明
IP
Role
192.168.1.111
ActiveNameNode
192.168.1.112
StandbyNameNode,Master,Worker
192.168.1.113
DataNode,Master,Worker
192.168.1.114
DataNode,Worker
HDFS集群和Spark集群之间节点共用。
2 安装HDFS
见HDFS2.X和Hive的安装部署文档:http://www.cnblogs.com/Scott007/p/3614960.html
3 Spark部署
Spark经常使用的安装部署模式有Spark On Yarn和Standalone,能够同时使用。
3.1 Spark on Yarn
这种模式,借助Yarn资源分配的功能,使用Spark客户端来向Yarn提交任务运行。只需将Spark的部署包放置到Yarn集群的某个节点上便可(或者是Yarn的客户端,能读取到Yarn集群的配置文件便可)。Spark自己的Worker节点、Master节点不须要启动。
可是,Spark的部署包须是基于对应的Yarn版本正确编译后的,不然会出现Spark和Yarn的兼容性问题。
on Yarn的两种运行方式,其运行结束后的日志不能在Yarn的Application管理界面看到,目前只能在客户端经过:
yarn logs -applicationId <applicationId>
命令查看每一个Application的日志。
3.1.1 配置
部署这种模式,须要修改conf目录下的spark-env.sh文件。在其中新增以下配置选项:
export HADOOP_HOME= /home/hadoop/hadoop-2.0.0-cdh4.5.0
export HADOOP_CONF_DIR= $HADOOP_HOME/etc/hadoop
SPARK_EXECUTOR_INSTANCES=2
SPARK_EXECUTOR_CORES=1
SPARK_EXECUTOR_MEMORY=400M
SPARK_DRIVER_MEMORY=400M
SPARK_YARN_APP_NAME="Spark 1.0.0"
其中:
(1) HADOOP_HOME:当前节点中HDFS的部署路径,由于Spark须要和HDFS中的节点在一块儿;
(2) HADOOP_CONF_DIR:HDFS节点中的conf配置文件路径,正常状况下此目录为$HADOOP_HOME/etc/hadoop;
(3) SPARK_EXECUTOR_INSTANCES:在Yarn集群中启动的Worker的数目,默认为2个;
(4) SPARK_EXECUTOR_CORES:每一个Worker所占用的CPU核的数目;
(5) SPARK_EXECUTOR_MEMORY:每一个Worker所占用的内存大小;
(6) SPARK_DRIVER_MEMORY:Spark应用程序Application所占的内存大小,这里的Driver对应Yarn中的ApplicationMaster;
(7) SPARK_YARN_APP_NAME:Spark Application在Yarn中的名字;
配置完成后,将Spark部署文件放置到Yarn的节点中便可。这里,将spark-1.0.0整个目录放到Yarn集群的一个节点192.168.1.112的/home/hadoop(设为spark的安装路径的父目录)路径下。
3.1.2 测试
在Spark的部署路径的bin路径下,执行spark-submit脚原本运行spark-examples包中的例子。执行以下:
./bin/spark-submit --master yarn \
--class org.apache.spark.examples.JavaWordCount \
--executor-memory 400M \
--driver-memory 400M \
/home/hadoop/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar ./hdfs-site.xml
这个例子是计算WordCount的,例子被打包在/home/hadoop/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar包中,对应的Class为org.apache.spark.examples.JavaWordCount,./hdfs-site.xml是HDFS中指定路径下的一个文件,WordCount就是针对它来作的。而--master yarn就是指定运行在Yarn集群中,以yarn模式运行。
Spark On Yarn有两种运行模式,一种是Yarn Cluster方式,一种是Yarn Client方式。
(1) Yarn Cluster: Spark Driver程序将做为一个ApplicationMaster在YARN集群中先启动,而后再由ApplicationMaster向RM申请资源启动 executor以运行Task。由于Driver程序在Yarn中运行,因此程序的运行结果不能在客户端显示,因此最好将结果保存在HDFS上,客户端 的终端显示的是做为Yarn的job的运行状况。
(2) Yarn Client: Spark Driver程序在客户端上运行,而后向Yarn申请运行exeutor以运行Task,本地程序负责最后的结果汇总等。客户端的Driver将应用提交 给Yarn后,Yarn会前后启动ApplicationMaster和executor,另外ApplicationMaster和executor都 是装载在container里运行,container默认的内存是1G,ApplicationMaster分配的内存是driver- memory,executor分配的内存是executor-memory。同时,由于Driver在客户端,因此程序的运行结果能够在客户端显 示,Driver以进程名为SparkSubmit的形式存在。
上面命令中的提交方式“yarn”就是默认按照“Yarn Client”方式运行。用户可自定义运行方式,经过“--master”指定程序以yarn、yarn-cluster或者yarn-client中的一种方式运行。
须要重点说明的是最后文件的路径,是至关于HDFS中的/user/hadoop而言,hadoop是当前命令的用户。“./hdfs-site.xml”在HDFS中的全路径为“hdfs://namespace/user/hadoop/hdfs-site.xml”,其中hadoop是当前的用户,namespace是HDFS的命名空间;若是写成“/hdfs-site.xml”则在HDFS中指的是“hdfs://namespace/hdfs-site.xml”;固然也能够直接传入“hdfs://namespace/user/hadoop/hdfs-site.xml”用于指定在HDFS中的要进行WordCount计算的文件。
另外,Spark应用程序须要的CPU Core数目和内存,须要根据当前Yarn的NodeManager的硬件条件相应设置,不能超过NodeManager的硬件条件。
./bin/spark-submit --master yarn \
--class org.apache.spark.examples.JavaWordCount \
--executor-memory 400M \
--driver-memory 400M \
/home/hadoop/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar hdfs://namespace/user/hadoop/hdfs-site.xml
在Yarn的ResourceManager对应的Web界面中查看启动的Application。
Running:
Success:
同时能够在启动脚本的客户端看到WordCount的运行结果:
3.2 Spark Standalone
这种模式,就是把Spark单独做为一个集群来进行部署。集群中有两种节点,一种是Master,另外一种是Worker节点。Master负责分配任务给Worker节点来执行,并负责最后的结果合并,Worker节点负责具体的任务执行。
3.2.1 配置
所需修改的配置文件除了spark-env.sh文件之外,还有slave文件,都位于conf目录中。
slave文件中保存的是worker节点host或者IP,此处的配置为:
192.168.1.112
192.168.1.113
192.168.1.114
至于spark-env.sh文件,能够配置以下属性:
(1) SPARK_MASTER_PORT:Master服务端口,默认为7077;
(2) SPARK_WORKER_CORES:每一个Worker进程所须要的CPU核的数目;
(3) SPARK_WORKER_MEMORY:每一个Worker进程所须要的内存大小;
(4) SPARK_WORKER_INSTANCES:每一个Worker节点上运行Worker进程的数目;
(5) SPARK_MASTER_WEBUI_PORT:Master节点对应Web服务的端口;
(6)export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.1.117:2181,192.168.1.118:2181,192.168.1.119:2181 -Dspark.deploy.zookeeper.dir=/spark":用于指定Master的HA,依赖于zookeeper集群;
(7) export SPARK_JAVA_OPTS="-Dspark.cores.max=4":用于限定每一个提交的Spark Application的使用的CPU核的数目,由于缺省状况下提交的Application会使用全部集群中剩余的CPU Core。
注意在Worker进程的CPU个数和内存大小的时候,要结合机器的实际硬件条件,若是一个Worker节点上的全部Worker进程须要的CPU总数目或者内存大小超过当前Worker节点的硬件条件,则Worker进程会启动失败。
将配置好的Spark文件拷贝至每一个Spark集群的节点上的相同路径中。为方便使用spark-shell,能够在环境变量中配置上SPARK_HOME。
3.2.2 启动
配置结束后,就该启动集群了。这里使用Master的HA方式,选取192.168.1.1十二、192.168.1.113节点做为Master,192.168.1.1十二、192.168.1.11三、192.168.1.114节点上运行两个Worker进程。
首先在192.168.1.113节点上作此操做:
启动以后,能够查看当前节点的进程:
另外,为了保证Master的HA,在192.168.1.112节点上只启动Master:
192.168.1.112节点的进程为:
启动事后,经过Web页面查看集群的状况,这里访问的是:
http://192.168.1.113:8090/
再看standby节点192.168.1.112的web界面http://192.168.1.112:8090/
3.2.3 测试
Spark的bin子目录中的spark-submit脚本是用于提交程序到集群中运行的工具,咱们使用此工具作一个关于pi的计算。命令以下:
./bin/spark-submit --master spark://spark113:7077 \ --class org.apache.spark.examples.SparkPi \ --name Spark-Pi --executor-memory 400M \ --driver-memory 512M \ /home/hadoop/spark-1.0.0/examples/target/scala-2.10/spark-examples-1.0.0-hadoop2.0.0-cdh4.5.0.jar
其中--master参数用于指定Master节点的URI,可是这里填的是Host,不是IP!
任务启动以后,在Spark的Master的Web界面能够看到运行中的Application。
任务运行结束以后,在Web界面中Completed Applications表格中会看到对应的结果。
同时,命令行中会打印出来运行的结果,以下所示:
4 spark-submit工具
上面测试程序的提交都是使用的spark-submit脚本,其位于$SPARK_HOME/bin目录中,执行时须要传入的参数说明以下:
Usage: spark-submit [options] <app jar | python file> [app options]
参数名称
含义
--master MASTER_URL
能够是spark://host:port, mesos://host:port, yarn, yarn-cluster,yarn-client, local
--deploy-mode DEPLOY_MODE
Driver程序运行的地方,client或者cluster
--class CLASS_NAME
主类名称,含包名
--name NAME
Application名称
--jars JARS
Driver依赖的第三方jar包
--py-files PY_FILES
用逗号隔开的放置在Python应用程序PYTHONPATH上的.zip, .egg, .py文件列表
--files FILES
用逗号隔开的要放置在每一个executor工做目录的文件列表
--properties-file FILE
设置应用程序属性的文件路径,默认是conf/spark-defaults.conf
--driver-memory MEM
Driver程序使用内存大小
--driver-java-options
--driver-library-path
Driver程序的库路径
--driver-class-path
Driver程序的类路径
--executor-memory MEM
executor内存大小,默认1G
--driver-cores NUM
Driver程序的使用CPU个数,仅限于Spark Alone模式
--supervise
失败后是否重启Driver,仅限于Spark Alone模式
--total-executor-cores NUM
executor使用的总核数,仅限于Spark Alone、Spark on Mesos模式
--executor-cores NUM
每一个executor使用的内核数,默认为1,仅限于Spark on Yarn模式
--queue QUEUE_NAME
提交应用程序给哪一个YARN的队列,默认是default队列,仅限于Spark on Yarn模式
--num-executors NUM
启动的executor数量,默认是2个,仅限于Spark on Yarn模式
--archives ARCHIVES
仅限于Spark on Yarn模式
另外,在执行spark-submit.sh工具进行提交应用以前,可使用以下方式提早定义好当前Spark Application所使用的CPU Core数目和内存大小:
SPARK_JAVA_OPTS="-Dspark.cores.max=2 -Dspark.executor.memory=600m" \
./bin/spark-submit --master spark://update113:7077 \ --class org.apache.spark.examples.SparkPi \
…
…
5 Spark HistoryServer
相似于Mapreduce的JobHistoryServer,Spark也有一个服务能够保存历史Application的运行记录。
修改$SPARK_HOME/conf下的spark-defaults.conf文件(注意,修改后的配置文件在每一个节点都要有),其中可修改的配置属性为:
属性名称
默认值
含义
spark.history.updateInterval
10
以秒为单位,更新日志相关信息的时间间隔
spark.history.retainedApplications
250
保存Application历史记录的个数,若是超过这个值,旧的应用程序信息将被删除
spark.history.ui.port
18080
HistoryServer的web端口
spark.history.kerberos.enabled
False
是否使用kerberos方式登陆访问HistoryServer,对于持久层位于安全集群的HDFS上是有用的,若是设置为true,就要配置下面的两个属性
spark.history.kerberos.principal
用于HistoryServer的kerberos主体名称
spark.history.kerberos.keytab
用于HistoryServer的kerberos keytab文件位置
spark.history.ui.acls.enable
False
受权用户查看应用程序信息的时候是否检查acl。若是启用,只有应用程序全部者和spark.ui.view.acls指定的用户能够查看应用程序信息;不然,不作任何检查
spark.eventLog.enabled
False
是否记录Spark事件
spark.eventLog.dir
保存日志相关信息的路径,能够是hdfs://开头的HDFS路径,也能够是file://开头的本地路径,都须要提早建立
spark.yarn.historyServer.address
Server端的URL:Ip:port 或者host:port
此处的设置以下:
spark.eventLog.enabled true spark.eventLog.dir hdfs://yh/user/hadoop/sparklogs spark.yarn.historyServer.address update113:18080
设置完文件以后,进入sbin目录启动服务:
运行完成的Application历史记录能够经过访问上面指定的HistoryServer地址查看,这里是http://192.168.1.113:18080/。
不管运行时是本地模式,仍是yarn-client、yarn-cluster,运行记录都可在此页面查看。
而且程序运行时的环境变量、系统参数、各个阶段的耗时都可在此查看,很强大!
6 Spark可配置参数
Spark参数的配置可经过三种方式:SparkConf方式 > 命令行参数方式 >文件配置方式。
6.1 应用属性
属性名
默认值
含义
spark.app.name
应用程序名称
spark.master
要链接的Spark集群Master的URL
spark.executor.memory
512 m
每一个executor使用的内存大小
spark.serializer
org.apache.spark
.serializer.JavaSerializer
序列化方式,官方建议使用org.apache.spark.serializer.KryoSerializer,固然也能够任意是定义为org.apache.spark.Serializer子类的序化器
spark.kryo.registrator
若是要使用 Kryo序化器,须要建立一个继承KryoRegistrator的类并设置系统属性spark.kryo.registrator指向该类
spark.local.dir
/tmp
用于保存map输出文件或者转储RDD。能够多个目录,之间以逗号分隔。在Spark 1.0 及更高版本此属性会被环境变量 SPARK_LOCAL_DIRS (Standalone、Mesos) 或 LOCAL_DIRS (YARN) 代替
spark.logConf
False
SparkContext 启动时是否记录有效 SparkConf信息
6.2 运行环境变量
属性名
默认值
含义
spark.executor.extraJavaOptions
传递给executor的额外JVM 选项,可是不能使用它来设置Spark属性或堆空间大小
spark.executor.extraClassPath
追加到executor类路径中的附加类路径
spark.executor.extraLibraryPath
启动executor JVM 时要用到的特殊库路径
spark.files.userClassPathFirst
False
executor在加载类的时候是否优先使用用户自定义的JAR包,而不是Spark带有的JAR包,目前,该属性只是一项试验功能
6.3 Shuffle操做相关属性
属性名
默认值
含义
spark.shuffle.consolidateFiles
False
若是为true,在shuffle时就合并中间文件,对于有大量Reduce任务的shuffle来讲,合并文件可 以提升文件系统性能,若是使用的是ext4 或 xfs 文件系统,建议设置为true;对于ext3,因为文件系统的限制,设置为true反而会使内核>8的机器下降性能
spark.shuffle.spill
True
若是为true,在shuffle期间经过溢出数据到磁盘来下降了内存使用总量,溢出阈值是由spark.shuffle.memoryFraction指定的
spark.shuffle.spill.compress
True
是否压缩在shuffle期间溢出的数据,若是压缩将使用spark.io.compression.codec。
spark.shuffle.compress
True
是否压缩map输出文件,压缩将使用spark.io.compression.codec。
spark.shuffle.file.buffer.kb
100
每一个shuffle的文件输出流内存缓冲区的大小,以KB为单位。这些缓冲区能够减小磁盘寻道的次数,也减小建立shuffle中间文件时的系统调用
spark.reducer.maxMbInFlight
48
每一个reduce任务同时获取map输出的最大大小 (以兆字节为单位)。因为每一个map输出都须要一个缓冲区来接收它,这表明着每一个 reduce 任务有固定的内存开销,因此要设置小点,除非有很大内存
6.4 SparkUI相关属性
属性名
默认值
含义
spark.ui.port
4040
应用程序webUI的端口
spark.ui.retainedStages
1000
在GC以前保留的stage数量
spark.ui.killEnabled
True
容许在webUI将stage和相应的job杀死
spark.eventLog.enabled
False
是否记录Spark事件,用于应用程序在完成后重构webUI
spark.eventLog.compress
False
是否压缩记录Spark事件,前提spark.eventLog.enabled为true
spark.eventLog.dir
file:///tmp/spark-events
若是spark.eventLog.enabled为 true,该属性为记录spark事件的根目录。在此根目录中,Spark为每一个应用程序建立分目录,并将应用程序的事件记录到在此目录中。能够将此属性 设置为HDFS目录,以便history server读取历史记录文件
6.5 压缩和序列化相关属性
属性名
默认值
含义
spark.broadcast.compress
True
是否在发送以前压缩广播变量
spark.rdd.compress
False
是否压缩RDD分区
spark.io.compression.codec
org.apache.spark.io.
LZFCompressionCodec
用于压缩内部数据如 RDD分区和shuffle输出的编码解码器, org.apache.spark.io.LZFCompressionCodec和 org.apache.spark.io.SnappyCompressionCodec。其中,Snappy提供更快速的压缩和解压缩,而LZF提供了 更好的压缩比
spark.io.compression.snappy
.block.size
32768
使用Snappy编码解码器时,编码解码器使用的块大小 (以字节为单位)
spark.closure.serializer
org.apache.spark.serializer.
JavaSerializer
用于闭包的序化器,目前只有支持Java序化器
spark.serializer.
objectStreamReset
10000
org.apache.spark.serializer.JavaSerializer序列化时,会缓存对象以防 止写入冗余数据,此时会中止这些对象的垃圾收集。经过调用重置序化器,刷新该信息就能够收集旧对象。若要关闭这重按期重置功能将其设置为< = 0 。默认状况下每10000个对象将重置序化器
spark.kryo.referenceTracking
True
当使用Kryo序化数据时,是否跟踪对同一对象的引用。若是你的对象图有回路或者同一对象有多个副本,有必要设置为true;其余状况下能够禁用以提升性能
spark.kryoserializer.buffer.mb
2
在Kryo 里容许的最大对象大小(Kryo会建立一个缓冲区,至少和序化的最大单个对象同样大)。每一个worker的每一个core只有一个缓冲区
6.6 执行时相关属性
属性名
默认值
含义
spark.default.parallelism
本地模式:机器核数
Mesos:8
其余:max(executor的core,2)
若是用户不设置,系统使用集群中运行shuffle操做的默认任务数(groupByKey、 reduceByKey等)
spark.broadcast.factory
org.apache.spark.broadcast.
HttpBroadcastFactory
广播的实现类
spark.broadcast.blockSize
4096
TorrentBroadcastFactory块大小(以kb为单位)。过大会下降广播速度;太小会使印象BlockManager性能
spark.files.overwrite
Fale
经过 SparkContext.addFile() 添加的文件在目标中已经存在而且内容不匹配时,是否覆盖目标文件
spark.files.fetchTimeout
False
在获取由driver经过SparkContext.addFile() 添加的文件时,是否使用通讯时间超时
spark.storage.memoryFraction
0.6
Java堆用于cache的比例
spark.tachyonStore.baseDir
System.getProperty("java.io.tmpdir")
用于存储RDD的techyon目录,tachyon文件系统的URL由spark.tachyonStore.url设置,也能够是逗号分隔的多个techyon目录
spark.storage.
memoryMapThreshold
8192
以字节为单位的块大小,用于磁盘读取一个块大小时进行内存映射。这能够防止Spark在内存映射时使用很小块,通常状况下,对块进行内存映射的开销接近或低于操做系统的页大小
spark.tachyonStore.url
tachyon://localhost:19998
基于techyon文件的URL
spark.cleaner.ttl
spark记录任何元数据(stages生成、task生成等)的持续时间。按期清理能够确保将超期的元数据丢弃,这在运行长时间任务是颇有用的,如运行7*24的sparkstreaming任务。RDD持久化在内存中的超期数据也会被清理
6.7 网络相关属性
属性名
默认值
含义
spark.driver.host
运行driver的主机名或 IP 地址
spark.driver.port
随机
driver侦听的端口
spark.akka.frameSize
10
以MB为单位的driver和executor之间通讯信息的大小,设置值越大,driver能够接受更大的计算结果
spark.akka.threads
4
用于通讯的actor线程数,在大型集群中拥有更多CPU内核的driver能够增长actor线程数
spark.akka.timeout
100
以秒为单位的Spark节点之间超时时间
spark.akka.heartbeat.pauses
600
下面3个参数是用于设置Akka自带的故障探测器。启用的话,以秒为单位设置以下这三个参数,有助于对恶意的executor的定位,而对于因为GC暂停或网络滞后引发的状况下,不须要开启故障探测器;另外故障探测器的开启会致使因为心跳信息的频繁交换而引发的网络泛滥。
本参数是设置可接受的心跳停顿时间
spark.akka.failure-detector.threshold
300.0
对应Akka的akka.remote.transport-failure-detector.threshold
spark.akka.heartbeat.interval
1000
心跳间隔时间
6.8 调度相关属性
属性名
默认值
含义
spark.task.cpus
1
为每一个任务分配的内核数
spark.task.maxFailures
4
Task的最大重试次数
spark.scheduler.mode
FIFO
Spark的任务调度模式,还有一种Fair模式
spark.cores.max
当应用程序运行在Standalone集群或者粗粒度共享模式Mesos集群时,应用程序向集群请求的最大CPU内 核总数(不是指每台机器,而是整个集群)。若是不设置,对于Standalone集群将使用spark.deploy.defaultCores中数值, 而Mesos将使用集群中可用的内核
spark.mesos.coarse
False
若是设置为true,在Mesos集群中运行时使用粗粒度共享模式
spark.speculation
False
如下几个参数是关于Spark推测执行机制的相关参数。此参数设定是否使用推测执行机制,若是设置为true则spark使用推测执行机制,对于Stage中拖后腿的Task在其余节点中从新启动,并将最早完成的Task的计算结果最为最终结果
spark.speculation.interval
100
Spark多长时间进行检查task运行状态用以推测,以毫秒为单位
spark.speculation.quantile
0.75
推测启动前,Stage必需要完成总Task的百分比
spark.speculation.multiplier
1.5
比已完成Task的运行速度中位数慢多少倍才启用推测
spark.locality.wait
3000
如下几个参数是关于Spark数据本地性的。本参数是以毫秒为单位启动本地数据task的等待时间,若是超出就启动 下一本地优先级别的task。该设置一样能够应用到各优先级别的本地性之间(本地进程 -> 本地节点 -> 本地机架 -> 任意节点 ),固然,也能够经过spark.locality.wait.node等参数设置不一样优先级别的本地性
spark.locality.wait.process
spark.locality.wait
本地进程级别的本地等待时间
spark.locality.wait.node
spark.locality.wait
本地节点级别的本地等待时间
spark.locality.wait.rack
spark.locality.wait
本地机架级别的本地等待时间
spark.scheduler.revive.interval
1000
复活从新获取资源的Task的最长时间间隔(毫秒),发生在Task由于本地资源不足而将资源分配给其余Task运行后进入等待时间,若是这个等待时间内从新获取足够的资源就继续计算
6.9 安全相关属性
属性名
默认值
含义
spark.authenticate
False
是否启用内部身份验证
spark.authenticate.secret
设置组件之间进行身份验证的密钥。若是不是YARN上运行而且spark.authenticate为true时,须要设置密钥
spark.core.connection. auth.wait.timeout
30
进行身份认证的超时时间
spark.ui.filters
Spark web UI 要使用的以逗号分隔的筛选器名称列表。筛选器要符合javax servlet Filter标准,每一个筛选器的参数能够经过设置java系统属性来指定:
spark.<class name of filter>.params='param1=value1,param2=value2'
例如:
-Dspark.ui.filters=com.test.filter1
-Dspark.com.test.filter1.params='param1=foo,param2=testing'
spark.ui.acls.enable
False
Spark webUI存取权限是否启用。若是启用,在用户浏览web界面的时候会检查用户是否有访问权限
spark.ui.view.acls
以逗号分隔Spark webUI访问用户的列表。默认状况下只有启动Spark job的用户才有访问权限
6.10 SparkStreaming相关属性
属性名
默认值
含义
spark.streaming.blockInterval
200
Spark Streaming接收器将接收数据合并成数据块并存储在Spark里的时间间隔,毫秒
spark.streaming.unpersist
True
若是设置为true,强迫将SparkStreaming持久化的RDD数据从Spark内存中清理,一样 的,SparkStreaming接收的原始输入数据也会自动被清理;若是设置为false,则容许原始输入数据和持久化的RDD数据可被外部的 Streaming应用程序访问,由于这些数据不会自动清理
6.11 Standalone模式特有属性
能够在文件conf/spark-env.sh中来设置此模式的特有相关属性:
(1)SPARK_MASTER_OPTS:配置master使用的属性
(2)SPARK_WORKER_OPTS:配置worker使用的属性
(3)SPARK_DAEMON_JAVA_OPTS:配置master和work都使用的属性
配置的时候,使用相似的语句:
export SPARK_MASTER_OPTS="-Dx1=y1 -Dx2=y2"
其中x表明属性,y表明属性值。
SPARK_MASTER_OPTS所支持的属性有:
属性名
默认值
含义
spark.deploy.spreadOut
True
Standalone集群管理器是否自由选择节点仍是固定到尽量少的节点,前者会有更好的数据本地性,后者对于计算密集型工做负载更有效
spark.worker.timeout
60
master由于没有收到心跳信息而认为worker丢失的时间(秒)
spark.deploy.defaultCores
若是没有设置spark.cores.max,该参数设置Standalone集群分配给应用程序的最大内核数,若是不设置,应用程序获取全部的有效内核。注意在一个共享的集群中,设置一个低值防止攫取了全部的内核,影响他人的使用
SPARK_WORKER_OPTS所支持的属性有
属性名
默认值
含义
spark.worker.cleanup.enabled
False
是否认期清理worker的应用程序工做目录,只适用于Standalone模式,清理的时候将无视应用程序是否在运行
spark.worker.cleanup.interval
1800
清理worker本地过时的应用程序工做目录的时间间隔(秒)
spark.worker.cleanup.appDataTtl
7*24*3600
worker保留应用程序工做目录的有效时间。该时间由磁盘空间、应用程序日志、应用程序的jar包以及应用程序的提交频率来设定
SPARK_DAEMON_JAVA_OPTS所支持的属性有:
属性名
含义
spark.deploy.recoveryMode
下面3个参数是用于配置zookeeper模式的master HA。设置为ZOOKEEPER表示启用master备用恢复模式,默认为NONE
spark.deploy.zookeeper.url
zookeeper集群URL
spark.deploy.zookeeper.dir
zooKeeper保存恢复状态的目录,缺省为/spark
spark.deploy.recoveryMode
设成FILESYSTEM启用master单节点恢复模式,缺省值为NONE
spark.deploy.recoveryDirectory
Spark保存恢复状态的目录
6.12 Spark on Yarn特有属性
属性名
默认值
含义
spark.yarn.applicationMaster.waitTries
10
RM等待Spark AppMaster启动重试次数,也就是SparkContext初始化次数。超过这个数值,启动失败
spark.yarn.submit.file.replication
3
应用程序上传到HDFS的文件的副本数
spark.yarn.preserve.staging.files
False
若为true,在job结束后,将stage相关的文件保留而不是删除
spark.yarn.scheduler.heartbeat.interval-ms
5000
Spark AppMaster发送心跳信息给YARN RM的时间间隔
spark.yarn.max.executor.failures
2倍于executor数
致使应用程序宣告失败的最大executor失败次数
spark.yarn.historyServer.address
Spark history server的地址(不要加http://)。这个地址会在Spark应用程序完成后提交给YARN RM,而后RM将信息从RM UI写到history server UI上。
7 示例配置
主要的配置文件均位于$SPARK_HOME/conf中,包括slave、spark-env.sh、spark-defaults.conf文件等。
7.1 slave文件
192.168.1.112 192.168.1.113 192.168.1.114
7.2 spark-env.sh文件
export JAVA_HOME="/export/servers/jdk1.6.0_25" #yarn
export HADOOP_HOME=/home/hadoop/hadoop-2.0.0-cdh4.5.0 export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
SPARK_EXECUTOR_INSTANCES=2 SPARK_EXECUTOR_CORES=1 SPARK_EXECUTOR_MEMORY=400M
SPARK_DRIVER_MEMORY=400M
SPARK_YARN_APP_NAME="Spark 1.0.0" #alone
SPARK_MASTER_WEBUI_PORT=8090 SPARK_WORKER_MEMORY=400M
SPARK_WORKER_CORES=1 SPARK_WORKER_INSTANCES=2 #Master HA
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=192.168.1.117:2181,192.168.1.118:2181,192.168.1.119:2181 -Dspark.deploy.zookeeper.dir=/spark"
7.3 spark-defaults.conf文件
#history server
spark.eventLog.enabled true spark.eventLog.dir hdfs://namespace/user/hadoop/sparklogs spark.yarn.historyServer.address spark113:18080 #shuffle
spark.shuffle.consolidateFiles true #task
spark.task.cpus 1 spark.task.maxFailures 3 #scheduler type
spark.scheduler.mode FAIR
#security
park.authenticate true spark.authenticate.secret hadoop
spark.core.connection.auth.wait.timeout 1500 spark.ui.acls.enable true spark.ui.view.acls root,hadoop
#each executor used max memory
spark.executor.memory 400m
#spark on yarn
spark.yarn.applicationMaster.waitTries 5 spark.yarn.submit.file.replication 3 spark.yarn.preserve.staging.files false spark.yarn.scheduler.heartbeat.interval-ms 5000 #park standalone and on mesos
spark.cores.max 4
8 Spark SQL
Spark支持Scala、Python等语言写的脚本直接在Spark环境执行,更重要的是支持对Hive语句进行包装后在Spark上运行。这就是Spark SQL。
8.1 相关配置
配置的步骤比较简单,把Hive的配置文件hive-site.xml直接放置到$SPARK_HOME的conf路径下便可。若是是想在Spark集群本地执行SQL的话,每一个对应的节点都要作一样的配置。
8.2 运行SQL
启动bin目录下的spark-shell脚本,依次执行以下语句:
val sc: SparkContext
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
import hiveContext._
hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")
hql("LOAD DATA LOCAL INPATH '/examples /data.txt' INTO TABLE src")
hql("FROM src SELECT key, value").collect().foreach(println)
上面的命令,分别是声明SparkContext对象,利用hql方法执行Hive的SQL语句,在执行SQL语句的过程当中,能够经过Hive的Cli客户端进行查看相应操做的结果。
8.3 on yarn模式
因为spark-shell脚本是在本地执行的,若是想放到Yarn上去执行的话,可使用上面第4节中的spark-submit工具,这时候须要对须要输入的sql语句进行包装,将包装类打包成jar文件,再提交。
包装类的代码以下:
1 package spark; 2 3 import java.util.List; 4 5 import org.apache.spark.SparkConf; 6 import org.apache.spark.api.java.JavaSparkContext; 7 import org.apache.spark.sql.api.java.Row; 8 import org.apache.spark.sql.hive.api.java.JavaHiveContext; 9 10 /** 11 * Description: 12 * Author: ITScott@163.com 13 * Date: 2014/7/15 14 */ 15 public class SparkSQL { 16 17 public static void main(String[] args) { 18 if(args.length != 2){ 19 System.out.println("usage: <applicationName> <sql statments>"); 20 System.exit(1); 21 } 22 23 String applicationName = args[0]; 24 String sql = args[1]; 25 26 SparkConf conf = new SparkConf().setAppName(applicationName); 27 JavaSparkContext sc = new JavaSparkContext(conf); 28 JavaHiveContext hiveContext = new JavaHiveContext(sc); 29 List<Row> results = hiveContext.hql(sql).collect(); 30 31 System.out.println("Sql is:" + sql + ", has been executed over."); 32 System.out.println("The result size is " + results.size() + ", they are:"); 33 for(int i=0; i<results.size(); i++){ 34 System.out.println(results.get(i).toString()); 35 } 36 37 System.out.println("Execute over ..."); 38 sc.stop(); 39 System.out.println("Stop over ..."); 40 } 41 42 }
将其打包成jar文件spark-0.0.1-SNAPSHOT.jar,再使用spark-submit工具进行任务的提交,命令以下:
./spark-submit \ --class spark.SparkSQL \ --master yarn-cluster \ --num-executors 3 \ --driver-memory 400m --executor-memory 400m --executor-cores 1 \ --jars /home/hadoop/spark-1.0.0/examples/libs/spark-core_2.10-1.0.0.jar,/home/hadoop/spark-1.0.0/examples/libs/spark-hive_2.10-1.0.0.jar,/home/hadoop/spark-1.0.0/lib_managed/jars/datanucleus-api-jdo-3.2.1.jar,/home/hadoop/spark-1.0.0/lib_managed/jars/datanucleus-core-3.2.2.jar,/home/hadoop/spark-1.0.0/lib_managed/jars/datanucleus-rdbms-3.2.1.jar,/home/hadoop/hive-0.12.0/lib/mysql-connector-java-5.1.27-bin.jar --files /home/hadoop/spark-1.0.0/conf/hive-site.xml \ /home/hadoop/spark-1.0.0/examples/libs/spark-0.0.1-SNAPSHOT.jar "hiveTest" "CREATE TABLE IF NOT EXISTS test4 (key INT, value STRING)"
其中,--master参数指定的是yarn-cluster模式,固然也可使用yarn-client模式,至于区别,已经在上文说了;--class指定的是咱们包装类的主类,见上文源码;--jars是依赖的四个jar包;--files是指定的hive-site.xml配置文件,提交到Yarn中的Application在执行的时候,须要把此配置文件分发到每一个Executor上;最后的两个参数,一个是Application的名称,一个是运行的SQL语句。
运行结束后,能够到Spark HistoryServer中查看运行结果。html