Spark Shuffle 专业级核心参数调优源码深刻剖析-Spark商业环境实战

本套系列博客从真实商业环境抽取案例进行总结和分享,并给出Spark商业应用实战指导,请持续关注本套博客。版权声明:本套Spark商业应用实战归做者(秦凯新)全部,禁止转载,欢迎学习。node

1 Spark运行资源优化配置

./bin/spark-submit \  
    --master yarn-cluster \  
    --num-executors 100 \  
    --executor-memory 6G \ 
    --executor-cores 4 \
    --driver-memory 1G \
    --conf spark.default.parallelism=1000 \
    --conf spark.storage.memoryFraction=0.5 \  
    --conf spark.shuffle.memoryFraction=0.3 \
复制代码

2 Spark运行资源优化配置


  • spark.reducer.maxSizeInFlight算法

  • 默认值:48mapache

  • 参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次可以拉取多少数据。缓存

  • 调优建议:若是做业可用的内存资源较为充足的话,能够适当增长这个参数的大小(好比96m),从而减小拉取数据的次数,也就能够减小网络传输的次数,进而提高性能。在实践中发现,合理调节该参数,性能会有1%~5%的提高。网络

    * Fetches and reads the partitions in range [startPartition, endPartition) from a shuffle by
       * requesting them from other nodes' block stores.
    
      private[spark] class BlockStoreShuffleReader[K, C](
          handle: BaseShuffleHandle[K, _, C],
          startPartition: Int,
          endPartition: Int,
          context: TaskContext,
          serializerManager: SerializerManager = SparkEnv.get.serializerManager,
          blockManager: BlockManager = SparkEnv.get.blockManager,
          mapOutputTracker: MapOutputTracker = SparkEnv.get.mapOutputTracker)
        extends ShuffleReader[K, C] with Logging {
      
        private val dep = handle.dependency
      
        /** Read the combined key-values for this reduce task */
        override def read(): Iterator[Product2[K, C]] = {
        
          val wrappedStreams = new ShuffleBlockFetcherIterator(
            context,
            blockManager.shuffleClient,
            blockManager,
            mapOutputTracker.getMapSizesByExecutorId(handle.shuffleId, startPartition, endPartition),
            serializerManager.wrapStream,
            // Note: we use getSizeAsMb when no suffix is provided for backwards compatibility
            
            SparkEnv.get.conf.getSizeAsMb("spark.reducer.maxSizeInFlight", "48m") * 1024 * 1024,       <=神来之笔
            
            SparkEnv.get.conf.getInt("spark.reducer.maxReqsInFlight", Int.MaxValue),
            SparkEnv.get.conf.get(config.REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS),
            SparkEnv.get.conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM),
            SparkEnv.get.conf.getBoolean("spark.shuffle.detectCorrupt", true))
    复制代码

  • spark.shuffle.io.maxRetries架构

  • 默认值:3app

  • 参数说明:shuffle read task从shuffle write task所在节点拉取属于本身的数据时,若是由于网络异常致使拉取失败,是会自动进行重试的。该参数就表明了能够重试的最大次数。若是在指定次数以内拉取仍是没有成功,就可能会致使做业执行失败。框架

  • 调优建议:对于那些包含了特别耗时的shuffle操做的做业,建议增长重试最大次数(好比60次),以免因为JVM的full gc或者网络不稳定等因素致使的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数能够大幅度提高稳定性。less

    public TransportConf(String module, ConfigProvider conf) {
      this.module = module;
      this.conf = conf;
      SPARK_NETWORK_IO_MODE_KEY = getConfKey("io.mode");
      SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY = getConfKey("io.preferDirectBufs");
      SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY = getConfKey("io.connectionTimeout");
      SPARK_NETWORK_IO_BACKLOG_KEY = getConfKey("io.backLog");
      SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY =  getConfKey("io.numConnectionsPerPeer");
      SPARK_NETWORK_IO_SERVERTHREADS_KEY = getConfKey("io.serverThreads");
      SPARK_NETWORK_IO_CLIENTTHREADS_KEY = getConfKey("io.clientThreads");
      SPARK_NETWORK_IO_RECEIVEBUFFER_KEY = getConfKey("io.receiveBuffer");
      SPARK_NETWORK_IO_SENDBUFFER_KEY = getConfKey("io.sendBuffer");
      SPARK_NETWORK_SASL_TIMEOUT_KEY = getConfKey("sasl.timeout");
      SPARK_NETWORK_IO_MAXRETRIES_KEY = getConfKey("io.maxRetries");
      SPARK_NETWORK_IO_RETRYWAIT_KEY = getConfKey("io.retryWait");
      SPARK_NETWORK_IO_LAZYFD_KEY = getConfKey("io.lazyFD");
      SPARK_NETWORK_VERBOSE_METRICS = getConfKey("io.enableVerboseMetrics");
    }
    复制代码

  • spark.shuffle.io.retryWait
  • 默认值:5s
  • 参数说明: shuffle read task从shuffle write task所在节点拉取属于本身的数据时,若是由于网络异常致使拉取失败,是会自动进行重试的,该参数表明了每次重试拉取数据的等待间隔,默认是5s。
  • 调优建议:建议加大间隔时长(好比60s),以增长shuffle操做的稳定性。

  • spark.shuffle.memoryFraction
  • 默认值:0.2
  • 参数说明:该参数表明了Executor内存中,分配给shuffle read task进行聚合操做的内存比例,默认是20%。
  • 调优建议:在资源参数调优中讲解过这个参数。若是内存充足,并且不多使用持久化操做,建议调高这个比例,给shuffle read的聚合操做更多内存,以免因为内存不足致使聚合过程当中频繁读写磁盘。在实践中发现,合理调节该参数能够将性能提高10%左右。

在这里好好唱一出戏:

(1) StaticMemoryManager 静态内存分配

private def getMaxStorageMemory(conf: SparkConf): Long = {
  
    val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
    
    val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)          <=神来之笔
    val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)          <=神来之笔
    (systemMaxMemory * memoryFraction * safetyFraction).toLong                        <=神来之笔
  }


    private def getMaxExecutionMemory(conf: SparkConf): Long = {
    
    val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)

    if (systemMaxMemory < MIN_MEMORY_BYTES) {
      throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +
        s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " +
        s"option or spark.driver.memory in Spark configuration.")
    }
    if (conf.contains("spark.executor.memory")) {
      val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
      if (executorMemory < MIN_MEMORY_BYTES) {
        throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
          s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
          s"--executor-memory option or spark.executor.memory in Spark configuration.")
      }
    }
    val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)              <=神来之笔
    val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)              <=神来之笔
    (systemMaxMemory * memoryFraction * safetyFraction).toLong                            <=神来之笔
  }



  private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024                            <=神来之笔

  def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
    val maxMemory = getMaxMemory(conf)
    new UnifiedMemoryManager(
      conf,
      maxHeapMemory = maxMemory,
      onHeapStorageRegionSize =
        (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,        <=神来之笔
      numCores = numCores)
    }
复制代码

(2) UnifiedMemoryManager 统一内存分配

/**
       * Return the total amount of memory shared between execution and storage, in bytes.
       */
      private def getMaxMemory(conf: SparkConf): Long = {
      
        val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)    <=神来之笔
        val reservedMemory = conf.getLong("spark.testing.reservedMemory",                        <=神来之笔
        
          if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
        val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
        
        if (systemMemory < minSystemMemory) {
          throw new IllegalArgumentException(s"System memory $systemMemory must " +
            s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
            s"option or spark.driver.memory in Spark configuration.")
        }
        // SPARK-12759 Check executor memory to fail fast if memory is insufficient
        if (conf.contains("spark.executor.memory")) {
          val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
          if (executorMemory < minSystemMemory) {
            throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
              s"$minSystemMemory. Please increase executor memory using the " +
              s"--executor-memory option or spark.executor.memory in Spark configuration.")
          }
        }
        val usableMemory = systemMemory - reservedMemory
        
        val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)                       <=神来之笔
        (usableMemory * memoryFraction).toLong
      }


      def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
        val maxMemory = getMaxMemory(conf)
        new UnifiedMemoryManager(
          conf,
          maxHeapMemory = maxMemory,
          onHeapStorageRegionSize =
            (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,          <=神来之笔    
          numCores = numCores)
      }
复制代码

  • spark.shuffle.manageride

  • 默认值:sort

  • 参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5之后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2之前的默认选项,可是Spark 1.2以及以后的版本默认都是SortShuffleManager了。tungsten-sort与sort相似,可是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。

  • 调优建议:因为SortShuffleManager默认会对数据进行排序,所以若是你的业务逻辑中须要该排序机制的话,则使用默认的SortShuffleManager就能够;而若是你的业务逻辑不须要对数据进行排序,那么建议参考后面的几个参数调优,经过bypass机制或优化的HashShuffleManager来避免排序操做,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,由于以前发现了一些相应的bug。

  • 配置参数:spark.shuffle.manager,默认是sort。

    val shortShuffleMgrNames = Map(
        "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
        "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
      val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
    复制代码

  • spark.shuffle.sort.bypassMergeThreshold
  • 默认值:200
  • 参数说明:当ShuffleManager为SortShuffleManager时,若是shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程当中不会进行排序操做,而是直接按照未经优化的HashShuffleManager的方式去写数据,可是最后会将每一个task产生的全部临时磁盘文件都合并成一个文件,并会建立单独的索引文件。
  • 调优建议:当你使用SortShuffleManager时,若是的确不须要排序操做,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减小了排序的性能开销。可是这种方式下,依然会产生大量的磁盘文件,所以shuffle write性能有待提升。

  • spark.shuffle.consolidateFiles
  • 默认值:false
  • 参数说明:若是使用HashShuffleManager,该参数有效。若是设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的状况下,这种方法能够极大地减小磁盘IO开销,提高性能。
  • 调优建议:若是的确不须要SortShuffleManager的排序机制,那么除了使用bypass机制,还能够尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。

  • Spark.Shuffle.blockTransferService
  • 默认值:Netty
  • 实如今Executor之间传递Shuffle缓存块,有Netty和Nio两种可用的实现。

  • Spark.Shuffle.compress

  • 默认是true

  • 判断是否对mapper端的聚合输出进行压缩,表示每个shuffle过程都会对mapper端的输出进行压缩。举例以下:若是有几千台或者上万台的机器进行汇聚计算,数据量和网络传输会很是大,这样会形成大连好的内存消耗,磁盘I/O消耗,以及网络I/O消耗。若是在Mapper端进行压缩,就会减小shuffle过程当中下一个Stage向上一个Stage抓数据的网络开销。

    * Merge zero or more spill files together, choosing the fastest merging strategy based on the
        * number of spills and the IO compression codec.
        * @return the partition lengths in the merged file.
     
       private long[] mergeSpills(SpillInfo[] spills, File outputFile) throws IOException {
    
         final boolean compressionEnabled = sparkConf.getBoolean("spark.shuffle.compress", true);   <=神来之笔
         
         final CompressionCodec compressionCodec = CompressionCodec$.MODULE$.createCodec(sparkConf);
         
         final boolean fastMergeEnabled =
           sparkConf.getBoolean("spark.shuffle.unsafe.fastMergeEnabled", true);                    <=神来之笔
           
         final boolean fastMergeIsSupported = !compressionEnabled ||
           CompressionCodec$.MODULE$.supportsConcatenationOfSerializedStreams(compressionCodec);    <=神来之笔
           
         final boolean encryptionEnabled = blockManager.serializerManager().encryptionEnabled();
         try {
           if (spills.length == 0) {
             new FileOutputStream(outputFile).close(); // Create an empty file
             return new long[partitioner.numPartitions()];
           } else if (spills.length == 1) {
             // Here, we don't need to perform any metrics updates because the bytes written to this
             // output file would have already been counted as shuffle bytes written.
             Files.move(spills[0].file, outputFile);
             return spills[0].partitionLengths;
           } else {
             final long[] partitionLengths;
             // There are multiple spills to merge, so none of these spill files' lengths were counted
             // towards our shuffle write count or shuffle write time. If we use the slow merge path,
             // then the final output file's size won't necessarily be equal to the sum of the spill
             // files' sizes. To guard against this case, we look at the output file's actual size when
             // computing shuffle bytes written.
             //
             // We allow the individual merge methods to report their own IO times since different merge
             // strategies use different IO techniques.  We count IO during merge towards the shuffle
             // shuffle write time, which appears to be consistent with the "not bypassing merge-sort"
             // branch in ExternalSorter.
             
             if (fastMergeEnabled && fastMergeIsSupported) {
             
               // Compression is disabled or we are using an IO compression codec that supports
               // decompression of concatenated compressed streams, so we can perform a fast spill merge
               // that doesn't need to interpret the spilled bytes.
               if (transferToEnabled && !encryptionEnabled) {
                 logger.debug("Using transferTo-based fast merge");
                 partitionLengths = mergeSpillsWithTransferTo(spills, outputFile);
               } else {
                 logger.debug("Using fileStream-based fast merge");
                 partitionLengths = mergeSpillsWithFileStream(spills, outputFile, null);
               }
             } else {
               logger.debug("Using slow merge");
               partitionLengths = mergeSpillsWithFileStream(spills, outputFile, compressionCodec);
             }
             // When closing an UnsafeShuffleExternalSorter that has already spilled once but also has
             // in-memory records, we write out the in-memory records to a file but do not count that
             // final write as bytes spilled (instead, it's accounted as shuffle write). The merge needs
             // to be counted as shuffle write, but this will lead to double-counting of the final
             // SpillInfo's bytes.
             writeMetrics.decBytesWritten(spills[spills.length - 1].file.length());
             writeMetrics.incBytesWritten(outputFile.length());
             return partitionLengths;
           }
         } catch (IOException e) {
           if (outputFile.exists() && !outputFile.delete()) {
             logger.error("Unable to delete output file {}", outputFile.getPath());
           }
           throw e;
         }
       }
    复制代码

  • spark.io.compression.codec

  • 该参数用来压缩内部数据,如:RDD分区,广播变量和shuffle输出的数据等,所采用的压缩有LZ4,Lzf,Snappy等三种选择,默认是Snappy,可是和Snappy相比较,Lzf的压缩率较高。建议在大量Shuffle过程当中,能够选择Lzf4。

  • 默认是Snappy

    private[spark] object CompressionCodec {
        private val configKey = "spark.io.compression.codec"
        private[spark] def supportsConcatenationOfSerializedStreams(codec: CompressionCodec): Boolean = {
          (codec.isInstanceOf[SnappyCompressionCodec] || codec.isInstanceOf[LZFCompressionCodec]
            || codec.isInstanceOf[LZ4CompressionCodec] || codec.isInstanceOf[ZStdCompressionCodec])
        }
      
        private val shortCompressionCodecNames = Map(
          "lz4" -> classOf[LZ4CompressionCodec].getName,
          "lzf" -> classOf[LZFCompressionCodec].getName,
          "snappy" -> classOf[SnappyCompressionCodec].getName,
          "zstd" -> classOf[ZStdCompressionCodec].getName)
    复制代码

  • spark.shuffle.file.buffer

  • 默认值:32k(考虑最小硬件下都能成功)

  • 参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件以前,会先写入buffer缓冲中,待缓冲写满以后,才会溢写到磁盘。

  • 调优建议:若是做业可用的内存资源较为充足的话,能够适当增长这个参数的大小(好比64k),从而减小shuffle write过程当中溢写磁盘文件的次数,也就能够减小磁盘IO次数,进而提高性能。在实践中发现,合理调节该参数,性能会有1%~5%的提高。

    private[spark] val SHUFFLE_FILE_BUFFER_SIZE =
          ConfigBuilder("spark.shuffle.file.buffer")
            .doc("Size of the in-memory buffer for each shuffle file output stream, in KiB unless " +
              "otherwise specified. These buffers reduce the number of disk seeks and system calls " +
              "made in creating intermediate shuffle files.")
            .bytesConf(ByteUnit.KiB)
            .checkValue(v => v > 0 && v <= Int.MaxValue / 1024,
              s"The file buffer size must be greater than 0 and less than ${Int.MaxValue / 1024}.")
            .createWithDefaultString("32k")
    
    
    
      final class ShuffleExternalSorter extends MemoryConsumer {
      
        private static final Logger logger = LoggerFactory.getLogger(ShuffleExternalSorter.class);
      
        @VisibleForTesting
        static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024;
      
        private final int numPartitions;
        private final TaskMemoryManager taskMemoryManager;
        private final BlockManager blockManager;
        private final TaskContext taskContext;
        private final ShuffleWriteMetrics writeMetrics;
      
        /**
         * Force this sorter to spill when there are this many elements in memory.
         */
        private final int numElementsForSpillThreshold;
      
        /** The buffer size to use when writing spills using DiskBlockObjectWriter */
        private final int fileBufferSizeBytes;
      
        /** The buffer size to use when writing the sorted records to an on-disk file */
        private final int diskWriteBufferSize;
      
        /**
         * Memory pages that hold the records being sorted. The pages in this list are freed when
         * spilling, although in principle we could recycle these pages across spills (on the other hand,
         * this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager
         * itself).
         */
        private final LinkedList<MemoryBlock> allocatedPages = new LinkedList<>();
      
        private final LinkedList<SpillInfo> spills = new LinkedList<>();
      
        /** Peak memory used by this sorter so far, in bytes. **/
        private long peakMemoryUsedBytes;
      
        // These variables are reset after spilling:
        @Nullable private ShuffleInMemorySorter inMemSorter;
        @Nullable private MemoryBlock currentPage = null;
        private long pageCursor = -1;
      
        ShuffleExternalSorter(
            TaskMemoryManager memoryManager,
            BlockManager blockManager,
            TaskContext taskContext,
            int initialSize,
            int numPartitions,
            SparkConf conf,
            ShuffleWriteMetrics writeMetrics) {
          super(memoryManager,
            (int) Math.min(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES, memoryManager.pageSizeBytes()),
            memoryManager.getTungstenMemoryMode());
          this.taskMemoryManager = memoryManager;
          this.blockManager = blockManager;
          this.taskContext = taskContext;
          this.numPartitions = numPartitions;
          
          // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
          this.fileBufferSizeBytes =
              (int) (long) conf.get(package$.MODULE$.SHUFFLE_FILE_BUFFER_SIZE()) * 1024;            <=神来之笔
              
          this.numElementsForSpillThreshold =
              (int) conf.get(package$.MODULE$.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD());
              
          this.writeMetrics = writeMetrics;
          
          this.inMemSorter = new ShuffleInMemorySorter(
            this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", true));
            
          this.peakMemoryUsedBytes = getMemoryUsage();
          this.diskWriteBufferSize =
              (int) (long) conf.get(package$.MODULE$.SHUFFLE_DISK_WRITE_BUFFER_SIZE());
        }
    复制代码

  • spark.shuffle.io.numConnectionsPerPeer

  • 仅Netty使用,复用主机之间的链接,以减小大型集群的链接创建,

  • 默认是1

    TransportConf :
        Number of concurrent connections between two nodes for fetching data.
    
        public int numConnectionsPerPeer() {
          return conf.getInt(SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY, 1);
        }
    复制代码

  • Spark.Shuffle.io.preferDirectBufs

  • 仅限Netty使用,堆外缓存能够有效减小垃圾回收和缓存复制。对于堆外内存紧张的用户来讲,能够考虑禁用这个选项,从而迫使Netty全部的内存都分配到堆上,默认是true。

    TransportConf:

    /** If true, we will prefer allocating off-heap byte buffers within Netty. */
        public boolean preferDirectBufs() {
          return conf.getBoolean(SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY, true);
        }
    复制代码

  • spark.shuffle.service.enabled

  • 默认为false,若是配置成true,BlocakManager实例生成时,须要读取Spark.Shuffle.service.port配置的端口,注意此时BlockManager的ShuffleClient再也不是默认的BlocakTransferSerice实例,而是ExternalShuffleClient。

  • 启用外部的Shuffle Service , NodeManager中会长期运行一个辅助任务,用于提高Shuffle计算性能。

    private[spark] class BlockManager(
          executorId: String,
          rpcEnv: RpcEnv,
          val master: BlockManagerMaster,
          val serializerManager: SerializerManager,
          val conf: SparkConf,
          memoryManager: MemoryManager,
          mapOutputTracker: MapOutputTracker,
          shuffleManager: ShuffleManager,
          val blockTransferService: BlockTransferService,
          securityManager: SecurityManager,
          numUsableCores: Int)
        extends BlockDataManager with BlockEvictionHandler with Logging {
      
        private[spark] val externalShuffleServiceEnabled =
          conf.getBoolean("spark.shuffle.service.enabled", false)
    
      // Port used by the external shuffle service. In Yarn mode, this may be already be
        // set through the Hadoop configuration as the server is launched in the Yarn NM.
        private val externalShuffleServicePort = {
          val tmpPort = Utils.getSparkOrYarnConfig(conf, "spark.shuffle.service.port", "7337").toInt
          if (tmpPort == 0) {
            // for testing, we set "spark.shuffle.service.port" to 0 in the yarn config, so yarn finds
            // an open port.  But we still need to tell our spark apps the right port to use.  So
            // only if the yarn config has the port set to 0, we prefer the value in the spark config
            conf.get("spark.shuffle.service.port").toInt
          } else {
            tmpPort
          }
        }
    
        // Client to read other executors' shuffle files. This is either an external service, or just the
        // standard BlockTransferService to directly connect to other Executors.
        private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
          val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
          new ExternalShuffleClient(transConf, securityManager,
            securityManager.isAuthenticationEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
        } else {
          blockTransferService
        }
    复制代码

基于Yarn的动态资源分配配置以下:

首先须要对YARN的NodeManager进行配置,使其支持Spark的Shuffle Service。
(1)修改每台NodeManager上的yarn-site.xml:
    ##修改
    <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>mapreduce_shuffle,spark_shuffle</value>
    </property>
    ##增长
    <property>
    <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
    <value>org.apache.spark.network.yarn.YarnShuffleService</value>
    </property>
    <property>
    <name>spark.shuffle.service.port</name>
    <value>7337</value>
    </property>

(2)将$SPARK_HOME/lib/spark-1.5.0-yarn-shuffle.jar拷贝到每台NodeManager的${HADOOP_HOME}/share/hadoop/yarn/lib/下。
(3)重启全部NodeManager。
复制代码

  • Spark.shuffle.Sort.bypassMergeThreshold

  • 默认值为200

  • 场景以下:若是Shuffle Read Task 的数量小于这个阈值(默认是200),那么Shuffle Write的过程当中不会进行排序操做,而是直接按照未经优化的HashShuffleManager方式去写数据,最终仍是会将每个Task所产生的全部临时磁盘文件合并成一个文件,并建立单独索引。

    private[spark] object SortShuffleWriter {
        def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
          // We cannot bypass sorting if we need to do map-side aggregation.
          if (dep.mapSideCombine) {
            require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
            false
          } else {
            val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
            dep.partitioner.numPartitions <= bypassMergeThreshold
          }
        }
    复制代码

  • Spark.Shuffle.spill

  • 默认是True

  • 即容许溢出到磁盘。

    private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager with Logging {
        if (!conf.getBoolean("spark.shuffle.spill", true)) {
          logWarning(
            "spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+." +
              " Shuffle will continue to spill to disk when necessary.")
        }
    复制代码

  • spark.shuffle.spill.compress

  • 设置为True是合理的,由于网络带宽每每最容易成为瓶颈

  • 建议综合考虑cpu ,磁盘,网络的实际能力。

    * Component which configures serialization, compression and encryption for various Spark
       * components, including automatic selection of which [[Serializer]] to use for shuffles.
       */
      private[spark] class SerializerManager(
          defaultSerializer: Serializer,
          conf: SparkConf,
          encryptionKey: Option[Array[Byte]]) {
    
       // Whether to compress broadcast variables that are stored
        private[this] val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true)
        // Whether to compress shuffle output that are stored
        private[this] val compressShuffle = conf.getBoolean("spark.shuffle.compress", true)
        // Whether to compress RDD partitions that are stored serialized
        private[this] val compressRdds = conf.getBoolean("spark.rdd.compress", false)
        // Whether to compress shuffle output temporarily spilled to disk
        private[this] val compressShuffleSpill = conf.getBoolean("spark.shuffle.spill.compress", true)
    复制代码

3 总结

本文综合Spark的核心参数配置,花大量时间,阅读源码并找到参数调优的位置和条件,一份好文实属不易,禁止转载,欢迎学习

秦凯新 于深圳

相关文章
相关标签/搜索