SparkStreaming性能调优大全!web
1、日志已满:shell
spark.executor.logs.rolling.maxSize apache
下面三个日志rolling参数记得设置: 编程
spark.executor.logs.rolling.strategy size api
spark.executor.logs.rolling.maxSize 134217728 #default byte session
spark.executor.logs.rolling.maxRetainedFilesapp
下面是spark1.6的源码:分布式
[spark] RollingFileAppender { = = = = = = (* ).toString = =
2、Spark Streamingz对Kafka的Offset进行管理ide
zookeeper.session.timeout.ms性能
通常跳大3~5倍。
http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/
http://www.tuicool.com/articles/vaUzquJ
[spark] SparkCuratorUtil Logging { = = = = ( conf: SparkConfzkUrlConf: = ): CuratorFramework = { ZK_URL = conf.get(zkUrlConf) zk = CuratorFrameworkFactory.newClient(ZK_URLExponentialBackoffRetry()) zk.start() zk }
3、 spark.task.maxFailures
默认4,调整10左右
TaskSetManagerSuite SparkFunSuite LocalSparkContext Logging { TaskLocality.{} = SparkConf = .getTimeAsMs() = () { .beforeEach() FakeRackUtil.cleanUp() } test() { sc = SparkContext() sched = FakeTaskScheduler(sc()) taskSet = FakeTask.createTaskSet() clock = ManualClock manager = TaskSetManager(schedtaskSetclock)
4、spark.streaming.kafka.maxRetries
默认1,调成3或者5
5、Spark Streaming链接Kafka用Direct方式。
6、怎么调优?入口在哪?
答案就是Spark配置参数的地方:
1. $SPARK_HOME/conf/spark-env.sh 脚本上配置。 配置格式以下:
export SPARK_DAEMON_MEMORY=1024m
2. 编程的方式(程序中在建立SparkContext以前,使用System.setProperty(“xx”,“xxx”)语句设置相应系统属性值),
val conf = new SparkConf()
.setMaster("local")
.setAppName("CountingSheep")
.set("spark.executor.memory", "1g")
val sc = new SparkContext(conf)
三、即在spark-shell下和spark-submit下配置
如:Scala> System.setProperty("spark.akka.frameSize","10240m")
System.setProperty("spark.rpc.askTimeout","800")
./bin/spark-submit --name "My app"
--master local[4]
--conf spark.shuffle.spill=false
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails
-XX:+PrintGCTimeStamps"
myApp.jar
spark-submit也会从默认配置文件conf/spark-defaults.conf里选取配置项,格式以下:
spark.master spark://iteblog.com:7077
spark.executor.memory 512m
spark.eventLog.enabled true
spark.serializer org.apache.spark.serializer.KryoSerializer
(一)环境变量spark-env.sh配置项
SCALA_HOME #指向你的scala安装路径
MESOS_NATIVE_LIBRARY #若是你要在Mesos上运行集群的话
SPARK_WORKER_MEMORY #做业可以使用的内存容量,默认格式1000M或者 2G (默认: 全部RAM去掉给操做系统用的1 GB);每一个做业独立的内存空间由SPARK_MEM决定。
SPARK_JAVA_OPTS #添加JVM选项。你能够经过-D来获取任何系统属性
eg: SPARK_JAVA_OPTS+="-Dspark.kryoserializer.buffer.mb=1024"
SPARK_MEM #设置每一个节点所能使用的内存总量。他们应该和JVM‘s -Xmx选项的格式保持一致(e.g.300m或1g)。注意:这个选项将很快被弃用支持系统属性spark.executor.memory,因此咱们推荐将它使用在新代码中。
SPARK_DAEMON_MEMORY #分配给Spark master和worker守护进程的内存空间(默认512M)
SPARK_DAEMON_JAVA_OPTS #Spark master和worker守护进程的JVM选项(默认:none)
(二)System Properties
spark.akka.frameSize: 控制Spark中通讯消息的最大容量 (如 task 的输出结果),默认为10M。当处理大数据时,task 的输出可能会大于这个值,须要根据实际数据设置一个更高的值。若是是这个值不够大而产生的错误,能够从 worker的日志 中进行排查。一般 worker 上的任务失败后,master 的运行日志上出现”Lost TID: “的提示,可经过查看失败的 worker 的日志文件($SPARK_HOME/worker/下面的log文件) 中记录的任务的 Serialized size of result 是否超过10M来肯定。
spark.default.parallelism: 控制Spark中的分布式shuffle过程默认使用的task数量,默认为8个。若是不作调整,数据量大时,就容易运行时间很长,甚至是出Exception,由于8个task没法handle那么多的数据。 注意这个值也不是说设置得越大越好。
spark.local.dir:Spark 运行时的临时目录,例如 map 的输出文件,保存在磁盘的 RDD 等都保存在这里。默认是 /tmp 这个目录,而一开始咱们搭建的小集群上 /tmp 这个目录的空间只有2G,大数据量跑起来就出 Exception (”No space left on device”)了。
如何如何查看已配置好并生效的参数?
经过webui来进行查看,http://master:4040/