Spark Configuration配置

Spark能够经过三种方式配置系统:html

  • 经过SparkConf对象, 或者Java系统属性配置Spark的应用参数
  • 经过每一个节点上的conf/spark-env.sh脚本为每台机器配置环境变量
  • 经过log4j.properties配置日志属性

Spark属性

Spark属性能够为每一个应用分别进行配置,这些属性能够直接经过SparkConf设定,也能够经过set方法设定相关属性。 
下面展现了在本地机使用两个线程并发执行的配置代码:java

val conf = new SparkConf() .setMaster("local[2]") .setAppName("CountingSheep") val sc = new SparkContext(conf)

对于部分时间参数须要制定单位,例如node

  • 时间单位:ms、s、m(min)、h、d、y分别表示毫秒、秒、分钟、小时、天和年。
  • 存储单位:
1b (bytes) 1k or 1kb (kibibytes = 1024 bytes) 1m or 1mb (mebibytes = 1024 kibibytes) 1g or 1gb (gibibytes = 1024 mebibytes) 1t or 1tb (tebibytes = 1024 gibibytes) 1p or 1pb (pebibytes = 1024 tebibytes)

动态加载Spark配置

有时为了不经过编码设定参数,能够经过建立空的SparkConf,并在调用脚本时制定相关参数python

./bin/spark-submit --name "My app" --master local[4] --conf spark.eventLog.enabled=false --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" myApp.jar

spark shell和spark-submit提供两种方式动态加载配置web

  • 命令行参数动态设定,例如–conf –master
  • 经过配置文件。spark-submit默认读取conf/spark-defaults.conf文件,每一行表明一个配置
spark.master spark://5.6.7.8:7077 spark.executor.memory 4g spark.eventLog.enabled true spark.serializer org.apache.spark.serializer.KryoSerializer

参数设置在执行时会进行合并,默认最高优先级是经过代码设置,其次是经过命令行参数,最后是默认的配置文件。shell

查看Spark配置

能够经过web界面http://:4040中的Environment查看Spark配置信息(仅显示spark-defaults.conf、SparkConf和命令行参数)。能够根据web页面肯定配置属性是否生效。apache

配置参数 Available Properties

大部分配置参数都有默认值,如下是经常使用配置缓存

Application Properties

属性 默认值 描述
spark.app.name (none) 应用程序的名称,会在日志和webUI显示
spark.driver.cores 1 driver程序占用的CPU核数,只在cluster模式下有小。
spark.driver.maxResultSize 1g 对Spark每一个action结果集大小的限制,最少是1M,若设为0则不限制大小。若Job结果超过限制则会异常退出,若结果集限制过大也可能形成OOM问题。
spark.driver.memory 1g driver进程可用的内存。注意:不能在代码中配置,由于此时driver已经启动,能够经过–driver-memory命令行参数或者配置文件进行配置。
spark.executor.memory 1g 每一个executor可用的内存数量 (e.g. 2g, 8g).
spark.extraListeners (none) 一系列实现SparkListener的类,spark监听总线会建立这些类的实例。
spark.local.dir /tmp 用于存储mpp输出文件和RDD缓存文件,常配置在SSD等存储设备上,能够经过逗号分隔指定多个目录。 注意: 在Spark 1.0 后续版本,会被SPARK_LOCAL_DIRS (Standalone, Mesos) or LOCAL_DIRS (YARN) 环境变量覆盖.
spark.logConf false 将SparkConf 的有效配置做为INFO进行记录
spark.master (none) 集群master节点

 

运行时环境

属性 默认值 描述
spark.driver.userClassPathFirst false 用户指定的jars优先于Spark的库。用于解决用户与环境的版本冲突
spark.executor.logs.rolling.maxRetainedFiles (none) 系统保留日志的最大数量,当超限时,旧的日志被删除,默认不启动
spark.executor.logs.rolling.time.interval daily 设置日志rolling时间间隔,默认rolling不启动
spark.executor.userClassPathFirst false executor执行时,用户指定的jars优先于Spark的库。用于解决用户与环境的版本冲突
spark.python.worker.memory 512m 每一个worker进程在汇集时的内存上限,若超限则输出到硬盘

 

 

 

 

 

Shuffle 行为

属性 默认值 描述
spark.reducer.maxSizeInFlight 48m 多个reduce任务从map输出获取结果的最大尺寸。因为每一个reducer须要建立缓存保留数据,除非内存很大,通常不要修改此参数
spark.shuffle.compress true 是否对map的输出结果进行压缩,压缩器为spark.io.compression.codec
spark.shuffle.file.buffer 32k 每一个shuffle文件输出流的内存缓存区大小。这些缓冲区减小了系统IO的调用次数
spark.shuffle.manager sort shuffle数据的实现方法,包括sort和hash两种。sort内存利用率更改,从1.2版本后sort做为默认实现方法
spark.shuffle.service.enabled false 激活外部shuffle服务。服务维护executor写的文件,于是executor能够被安全移除。须要设置spark.dynamicAllocation.enabled 为true,同事指定外部shuffle服务。
spark.shuffle.service.port 7337 默认的外部shuffle服务端口
spark.shuffle.sort.bypassMergeThreshold 200 用于设置在Reducer的partition数目少于多少的时候,Sort Based Shuffle内部不使用Merge Sort的方式处理数据,而是直接将每一个partition写入单独的文件。这个方式和Hash Based的方式是相似的,区别就是在最后这些文件仍是会合并成一个单独的文件,并经过一个index索引文件来标记不一样partition的位置信息。从Reducer看来,数据文件和索引文件的格式和内部是否作过Merge Sort是彻底相同的。这个能够看作SortBased Shuffle在Shuffle量比较小的时候对于Hash Based Shuffle的一种折衷。固然了它和Hash Based Shuffle同样,也存在同时打开文件过多致使内存占用增长的问题。所以若是GC比较严重或者内存比较紧张,能够适当的下降这个值。
spark.shuffle.spill.compress true 若为true,表明处理的中间结果在spill到本地硬盘时都会进行压缩,在将中间结果取回进行merge的时候,要进行解压。在Disk IO成为瓶颈的场景下,这个被设置为true可能比较合适;若是本地硬盘是SSD,那么这个设置为false可能比较合适。

 

Spark UI

属性 默认值 描述
spark.eventLog.compress false 是否压缩事务日志,当spark.eventLog.enabled为true时有效
spark.eventLog.dir file:///tmp/spark-events 记录event日志的目录,也能够是hdfs目录
spark.eventLog.enabled false 是否记录Spark events,用于在应用执行完后重建Web UI
spark.eventLog.enabled true 是否容许经过web UI kill掉stages和相关的job
spark.ui.port 4040 应用统计信息的端口
spark.ui.retainedJobs 1000 spark UI和status APIs在垃圾回收以前记录的任务数
spark.ui.retainedStages 1000 spark UI和status APIs在垃圾回收以前记录的Stage数
spark.worker.ui.retainedExecutors 1000 spark UI和status APIs在垃圾回收以前记录的executor数目
spark.worker.ui.retainedDrivers 1000 同上
spark.worker.ui.retainedExecutions 1000 同上
spark.worker.ui.retainedBatches 1000 同上

 

 

 

 

 

 

 

 

 

 

压缩和序列化

属性 默认值 描述
spark.broadcast.compress true 广播变量是否被压缩
spark.closure.serializer org.apache.spark.serializer.JavaSerializer 闭包的序列化类,目前只支持java序列化
spark.io.compression.codec snappy 内部数据RDD的压缩编码器,用于RDD、广播变量和shuffle数据压缩。支持三种编码器:lz四、lzf和snappy。
spark.io.compression.lz4.blockSize 32k 压缩块大小
spark.io.compression.snappy.blockSize 32k 压缩块大小
spark.kryo.classesToRegister (none) 若使用kryo序列化,本参数指定须要注册的自定义类
spark.kryo.referenceTracking true(false when using Spark SQL Thrift Server) 序列化时是否使用相同的对象,若对象图谱中包含同一对象的多个副本,会提升性能。若不存在该状况,关闭能够提升性能
spark.kryo.registrationRequired false 是否须要kry注册,若为true,在序列化未注册的类时kryo会抛出异常;若为false,对于未注册的类,kryo会在每一个对象写入类名,下降了性能。
spark.kryo.registrator (none) 指定自定义的kryo注册类
spark.kryoserializer.buffer.max 64m kryo序列化的缓冲区大小,须要比全部序列化对象大
spark.kryoserializer.buffer 64k 初始化的序列化缓冲区,能够根据须要增加到spark.kryoserializer.buffer.max
spark.rdd.compress false 是否序列化RDD分区,能经过耗费大量CPU下降存储空间
spark.serializer org.apache.spark.serializer.JavaSerializer 序列化对象的类,建议使用org.apache.spark.serializer.KryoSerializer
spark.serializer.objectStreamReset 100 当序列化对象时,为了减小IO会缓存大量数据,然而这会阻止垃圾回收,能够经过reset将刷新缓冲区。

 

内存管理

属性 默认值 描述
spark.memory.fraction 0.75 用于执行和存储的内存比例。值越小,计算内存越小,缓冲区数据被排除的可能越大。这个比例剩余的部分用于存储spark元数据、用户数据结构,最好使用默认值。
spark.memory.storageFraction 0.5 在存储和计算内存中,缓存所占的内存比例,值越大,计算可用内存越少。
spark.memory.offHeap.enabled false 若为true,则spark会尝试使用堆外内存,同时要求spark.memory.offHeap.size必须是正数
spark.memory.offHeap.size 0 堆外内存可用的字节数
spark.memory.useLegacyMode false 是否可使用传统内存管理。本参数为true时,如下参数(已废弃)才有效:spark.shuffle.memoryFraction spark.storage.memoryFraction spark.storage.unrollFraction

 

执行操做

属性 默认值 描述
spark.broadcast.blockSize 4m TorrentBroadcastFactory中每一个block的大小。若值太大会下降广播的并行度,若值过小则可能出现BlockManager瓶颈
spark.broadcast.factory org.apache.spark.broadcast.TorrentBroadcastFactory 广播的实现
spark.cleaner.ttl (infinite) spark记忆元数据的时间。若超时则清理,用于长时间运行例如spark stream应用,须要注意被缓存的RDD超时也会被清理。
spark.executor.cores 在Yarn是1;standalone中是全部可用的core 每一个executor可用的CPU核心数目,standalone模式下,每一个worker会每一个executor使用一个CPU核心
spark.default.parallelism 对于reduceByKey和join操做,是RDD中最大分区数;对于parallelize操做,分区数与集群管理相关:本地模式(CPU核心数做为分区数)、Mesos(8)、其余(全部执行器的核心数与2求最大值) 默认的并行数
spark.executor.heartbeatInterval 10s executor与driver的心跳间隔
spark.files.fetchTimeout 60s SparkContext.addFile的超时值
spark.files.useFetchCache true 若为true,同一应用的执行器间经过局部缓存优化;若为false则各个executor获取各自文件
spark.files.overwrite false 当目标文件存在时是否重写
spark.hadoop.cloneConf false 若为true,则为每一个task拷贝hadoop的配置对象;
spark.hadoop.validateOutputSpecs true 若设置为true,saveAsHadoopFile会验证输出目录是否存在。虽然设为false能够忽略文件存在的异常,但建议使用Hadoop文件系统的API手动删除输出目录。当经过Spark Streaming的StreamingContext时本参数会被忽略,由于当进行checkpoint恢复时会重写已经存在的文件。
spark.storage.memoryMapThreshold 2m 读取文件块时Spark内存map最小的大小。当map所占内存接近或小于操做系统page大小时,内存映射负载很大
spark.externalBlockStore.blockManager org.apache.spark.storage.TachyonBlockManager 存储RDDs的外部文件块管理器。文件系统的URL被设置为spark.externalBlockStore.url
spark.externalBlockStore.baseDir System.getProperty(“java.io.tmpdir”) 外部块存储RDD的目录。文件系统URL被设置为spark.externalBlockStore.url
spark.externalBlockStore.url tachyon://localhost:19998 for Tachyon 表明外部块文件系统的URL

 

网络

属性 默认值 描述
spark.akka.frameSize 128 最大消息大小(MB)。通常用于限制executor与driver之间的信息大小,若运行几千个map和reduce任务,能够适当调大参数
spark.akka.heartbeat.interval 1000s 能够调成很大,用于禁止Akka内部的传输失败检测。越大的时间间隔减小网络负载,越小的间隔容易进行Akka错误检测。
spark.akka.heartbeat.pauses 6000s 与spark.akka.heartbeat.interval相似
spark.akka.threads 4 actor用于传输的线程个数。当driver有较多CPU是能够调大该值
spark.akka.timeout 100s spark节点间沟通的超时时间
spark.blockManager.port random block 管理器的监听端口
spark.broadcast.port random driver的http广播监听端口
spark.driver.host (local hostname) driver监听的主机名或者IP地址。用于master和executor的信息传输
spark.driver.port random driver监听的接口
spark.executor.port random executor监听的端口
spark.fileserver.port random driver 文件服务监听的接口
spark.network.timeout 120s 默认全部网络交互的超时时间
spark.port.maxRetries 16 端口重试链接最大次数
spark.replClassServer.port random driver类服务监听的接口
spark.rpc.numRetries 3 RPC任务重试的次数
spark.rpc.retry.wait 3s RPC任务ask操做的延时
spark.rpc.askTimeout 120s RPC任务等待超时
spark.rpc.lookupTimeout 120s RPC远程lookup操做超时时间

 

 做业调度Scheduling

属性 默认值 描述
spark.cores.max (not set) spark应用可用最大CPU内核数,若未设置,stanalone集群使用 spark.deploy.defaultCores做为参数,Mesos可使用全部CPU。
spark.locality.wait 3s data-local或less-local任务启动任务超时时间。若任务时间长且数据再也不本地,则最后调大
spark.locality.wait.node spark.locality.wait Customize the locality wait for node locality. For example, you can set this to 0 to skip node locality and search immediately for rack locality (if your cluster has rack information).
spark.locality.wait.process spark.locality.wait Customize the locality wait for process locality. This affects tasks that attempt to access cached data in a particular executor process.
spark.locality.wait.rack spark.locality.wait Customize the locality wait for rack locality.
spark.scheduler.maxRegisteredResourcesWaitingTime 30s Maximum amount of time to wait for resources to register before scheduling begins.
spark.scheduler.mode FIFO 做业调度模式。能够设置为FAIR公平调度或者FIFO先进先出
spark.scheduler.revive.interval 1s The interval length for the scheduler to revive the worker resource offers to run tasks.
spark.speculation false 若设置为true,则会根据执行慢的stage屡次启动,以最早完成为准。
spark.speculation.interval 100ms speculate 的频率
spark.speculation.multiplier 1.5 当task比全部任务执行时间的中值长多少倍时启动speculate
spark.speculation.quantile 0.75 启动speculate前任务完成数据量所占比例值
spark.task.cpus 1 每一个task分配的cpu数量
spark.task.maxFailures 4 task失败的最屡次数,比重试次数多1

 

动态分配内存

属性 默认值 描述
spark.dynamicAllocation.enabled false 是否启动动态资源分配
spark.dynamicAllocation.executorIdleTimeout 60s 若动态分配设为true且executor处于idle状态的时间已超时,则移除executor
spark.dynamicAllocation.cachedExecutorIdleTimeout infinity 若executor缓存数据超时,且动态内存分配为true,则移除缓存
spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.minExecutors 若动态分配为true,执行器的初始数量
spark.dynamicAllocation.maxExecutor infinity 执行器最大数量
spark.dynamicAllocation.minExecutor 0 执行器最少数量
spark.dynamicAllocation.schedulerBacklogTimeout 1s If dynamic allocation is enabled and there have been pending tasks backlogged for more than this duration, new executors will be requested.
spark.dynamicAllocation.sustainedSchedulerBacklogTimeout schedulerBacklogTimeout Same as spark.dynamicAllocation.schedulerBacklogTimeout, but used only for subsequent executor requests.

 

安全

属性 默认值 描述
spark.acls.enable false Whether Spark acls should are enabled. If enabled, this checks to see if the user has access permissions to view or modify the job. Note this requires the user to be known, so if the user comes across as null no checks are done. Filters can be used with the UI to authenticate and set the user.
spark.admin.acls Empty Comma separated list of users/administrators that have view and modify access to all Spark jobs. This can be used if you run on a shared cluster and have a set of administrators or devs who help debug when things work. Putting a “*” in the list means any user can have the priviledge of admin.
spark.authenticate false Whether Spark authenticates its internal connections. See spark.authenticate.secret if not running on YARN.
spark.authenticate.secret None Set the secret key used for Spark to authenticate between components. This needs to be set if not running on YARN and authentication is enabled.
spark.authenticate.enableSaslEncryption false Enable encrypted communication when authentication is enabled. This option is currently only supported by the block transfer service.
spark.network.sasl.serverAlwaysEncrypt false Disable unencrypted connections for services that support SASL authentication. This is currently supported by the external shuffle service.
spark.core.connection.ack.wait.timeout 60s How long for the connection to wait for ack to occur before timing out and giving up. To avoid unwilling timeout caused by long pause like GC, you can set larger value.
spark.core.connection.auth.wait.timeout 30s How long for the connection to wait for authentication to occur before timing out and giving up.
spark.modify.acls Empty Comma separated list of users that have modify access to the Spark job. By default only the user that started the Spark job has access to modify it (kill it for example). Putting a “*” in the list means any user can have access to modify it.
spark.ui.filters None Comma separated list of filter class names to apply to the Spark web UI. The filter should be a standard javax servlet Filter. Parameters to each filter can also be specified by setting a java system property of:spark..params=’param1=value1,param2=value2’For example:-Dspark.ui.filters=com.test.filter1 -Dspark.com.test.filter1.params=’param1=foo,param2=testing’
spark.ui.view.acls Empty Comma separated list of users that have view access to the Spark web ui. By default only the user that started the Spark job has view access. Putting a “*” in the list means any user can have view access to this Spark job.

 

加密Encryption

属性 默认值 描述
spark.ssl.enabled false 是否在全部协议中支持SSL链接
spark.ssl.enabledAlgorithms Empty 一些列的密码运算,指定的cipher须要被JVM支持
spark.ssl.keyPassword None 私钥密码
spark.ssl.keyStore None key存储文件,能够是组件启动的相对路径也能够是绝对路径
spark.ssl.keyStorePassword None A password to the key-store
spark.ssl.protocol None A protocol name. The protocol must be supported by JVM. The reference list of protocols one can find on this page.
spark.ssl.trustStore None A path to a trust-store file. The path can be absolute or relative to the directory where the component is started in.
spark.ssl.trustStorePassword None A password to the trust-store.

 

 

 

 

 

 

 

Spark Streaming

属性 默认值 描述
spark.streaming.backpressure.enabled false Enables or disables Spark Streaming’s internal backpressure mechanism (since 1.5). This enables the Spark Streaming to control the receiving rate based on the current batch scheduling delays and processing times so that the system receives only as fast as the system can process. Internally, this dynamically sets the maximum receiving rate of receivers. This rate is upper bounded by the values spark.streaming.receiver.maxRate and spark.streaming.kafka.maxRatePerPartition if they are set (see below).
spark.streaming.blockInterval 200ms Interval at which data received by Spark Streaming receivers is chunked into blocks of data before storing them in Spark. Minimum recommended - 50 ms. See the performance tuning section in the Spark Streaming programing guide for more details.
spark.streaming.receiver.maxRate not set Maximum rate (number of records per second) at which each receiver will receive data. Effectively, each stream will consume at most this number of records per second. Setting this configuration to 0 or a negative number will put no limit on the rate. See the deployment guide in the Spark Streaming programing guide for mode details.
spark.streaming.receiver.writeAheadLog.enable false Enable write ahead logs for receivers. All the input data received through receivers will be saved to write ahead logs that will allow it to be recovered after driver failures. See the deployment guide in the Spark Streaming programing guide for more details.
spark.streaming.unpersist true Force RDDs generated and persisted by Spark Streaming to be automatically unpersisted from Spark’s memory. The raw input data received by Spark Streaming is also automatically cleared. Setting this to false will allow the raw data and persisted RDDs to be accessible outside the streaming application as they will not be cleared automatically. But it comes at the cost of higher memory usage in Spark.
spark.streaming.stopGracefullyOnShutdown false If true, Spark shuts down the StreamingContext gracefully on JVM shutdown rather than immediately.
spark.streaming.kafka.maxRatePerPartition not set Maximum rate (number of records per second) at which data will be read from each Kafka partition when using the new Kafka direct stream API. See the Kafka Integration guide for more details.
spark.streaming.kafka.maxRetries 1 Maximum number of consecutive retries the driver will make in order to find the latest offsets on the leader of each partition (a default value of 1 means that the driver will make a maximum of 2 attempts). Only applies to the new Kafka direct stream API.
spark.streaming.ui.retainedBatches 1000 How many batches the Spark Streaming UI and status APIs remember before garbage collecting.
spark.streaming.driver.writeAheadLog.closeFileAfterWrite false Whether to close the file after writing a write ahead log record on the driver. Set this to ‘true’ when you want to use S3 (or any file system that does not support flushing) for the metadata WAL on the driver.
spark.streaming.receiver.writeAheadLog.closeFileAfterWrite false Whether to close the file after writing a write ahead log record on the receivers. Set this to ‘true’ when you want to use S3 (or any file system that does not support flushing) for the data WAL on the receivers.

 

SparkR

属性 默认值 描述
spark.r.numRBackendThreads 2 RBackend维护的RPC句柄个数
spark.r.command Rscript Executable for executing R scripts in cluster modes for both driver and workers.
spark.r.driver.command spark.r.command Executable for executing R scripts in client modes for driver. Ignored in cluster modes

 

 

 

 

其余参数具体参见https://spark.apache.org/docs/latest/configuration.html安全

环境变量

部分Spark设置能够经过配置环境变量(在conf/spark-env.sh中设置)实现。在standalone和Mesos模式中,这个文件能够设定机器特定的信息,例如主机名。因为spark-env.sh安装后并不存在,能够拷贝spark-env.sh.template,并确保它有执行权限。 
如下是spark-env.sh的经常使用参数:网络

环境变量 描述
JAVA_HOME java安装目录
PYSPARK_PYTHON 运行pyspark的python可执行文件,默认是python2.7
SPARK_DRIVER_R SparkR shell的R可执行文件,默认是R
SPARK_LOCAL_IP 机器绑定的IP地址
SPARK_PUBLIC_DNS Spark程序向外广播的主机名

 

 

 

 

 

 

除此以外还有一些spark standalone集群设置的参数,例如每一个机器运行的最大内存、CPU核数等。

配置日志

Spark使用log4j记录日志。能够经过配置conf/log4j.properties文件配置日志。

覆盖配置目录

经过指定SPARK_CONF_DIR变量,能够覆盖默认的SAPRK_HOME/conf下面的配置,例如spark-defaults.conf, spark-env.sh, log4j.properties等待。

集成Hadoop集群配置

若想经过spark读写HDFS,须要将如下两个配置文件拷贝到spark classpath目录下 
+ hdfs-size.xml :提供HDFS客户端默认的操做 
+ core-site.xml :设置默认的文件系统名称

虽然不一样发行版本配置文件不一样,但通常都在/etc/hadoop/conf目录下。为了使得spark能够找到这些配置文件,在spark-env.sh文件中配置HADOOP_CONF_DIR变量。

相关文章
相关标签/搜索