本篇博文的目标是
1. Executor的WAL机制详解
2. 消息重放Kafkajava
数据安全性的考虑:node
Executor容错方式:
1. 最简单的容错是副本方式,基于底层BlockManager副本容错,也是默认的容错方式。
2. 接收到数据以后不作副本,支持数据重放,所谓重放就是支持反复读取数据。python
BlockManager备份:apache
/**
* :: DeveloperApi ::
* Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, * or ExternalBlockStore, whether to drop the RDD to disk if it falls out of memory or * ExternalBlockStore, whether to keep the data in memory in a serialized format, and whether * to replicate the RDD partitions on multiple nodes. * * The [[org.apache.spark.storage.StorageLevel$]] singleton object contains some static constants * for commonly useful storage levels. To create your own storage level object, use the * factory method of the singleton object (`StorageLevel(...)`). */ @DeveloperApi class StorageLevel private( private var _useDisk: Boolean, private var _useMemory: Boolean, private var _useOffHeap: Boolean, private var _deserialized: Boolean, private var _replication: Int = 1) extends Externalizable {
2. ReceiverBlockHandler源码以下:
private val receivedBlockHandler: ReceivedBlockHandler = { //若是要开启WAL必需要有checkpoint目录。 if (WriteAheadLogUtils.enableReceiverLog(env.conf)) { if (checkpointDirOption.isEmpty) { throw new SparkException( "Cannot enable receiver write-ahead log without checkpoint directory set. " + "Please use streamingContext.checkpoint() to set the checkpoint directory. " + "See documentation for more details.") } new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId, receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) } else { new BlockManagerBasedBlockHandler(env.blockManager, //此时的storageLevel是构建Receiver的时候传入进来的 receiver.storageLevel) } }
3. 默认没有开启WAL机制。
/** A helper class with utility functions related to the WriteAheadLog interface */ private[streaming] object WriteAheadLogUtils extends Logging { val RECEIVER_WAL_ENABLE_CONF_KEY = "spark.streaming.receiver.writeAheadLog.enable" val RECEIVER_WAL_CLASS_CONF_KEY = "spark.streaming.receiver.writeAheadLog.class" val RECEIVER_WAL_ROLLING_INTERVAL_CONF_KEY = "spark.streaming.receiver.writeAheadLog.rollingIntervalSecs" val RECEIVER_WAL_MAX_FAILURES_CONF_KEY = "spark.streaming.receiver.writeAheadLog.maxFailures" val RECEIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY = "spark.streaming.receiver.writeAheadLog.closeFileAfterWrite" val DRIVER_WAL_CLASS_CONF_KEY = "spark.streaming.driver.writeAheadLog.class" val DRIVER_WAL_ROLLING_INTERVAL_CONF_KEY = "spark.streaming.driver.writeAheadLog.rollingIntervalSecs" val DRIVER_WAL_MAX_FAILURES_CONF_KEY = "spark.streaming.driver.writeAheadLog.maxFailures" val DRIVER_WAL_BATCHING_CONF_KEY = "spark.streaming.driver.writeAheadLog.allowBatching" val DRIVER_WAL_BATCHING_TIMEOUT_CONF_KEY = "spark.streaming.driver.writeAheadLog.batchingTimeout" val DRIVER_WAL_CLOSE_AFTER_WRITE_CONF_KEY = "spark.streaming.driver.writeAheadLog.closeFileAfterWrite" val DEFAULT_ROLLING_INTERVAL_SECS = 60 val DEFAULT_MAX_FAILURES = 3 def enableReceiverLog(conf: SparkConf): Boolean = { conf.getBoolean(RECEIVER_WAL_ENABLE_CONF_KEY, false) }
4. 例如socketTextStream源码以下:
/**
* Create a input stream from TCP source hostname:port. Data is received using * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited * lines. * @param hostname Hostname to connect to for receiving data * @param port Port to connect to for receiving data * @param storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) */ def socketTextStream( hostname: String, port: Int, //初始化了storageLevel storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[String] = withNamedScope("socket text stream") { socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
5. BlockManagerBasedBlockHandler源码以下:
/** * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which * stores the received blocks into a block manager with the specified storage level. */ private[streaming] class BlockManagerBasedBlockHandler( blockManager: BlockManager, storageLevel: StorageLevel) extends ReceivedBlockHandler with Logging { def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { var numRecords = None: Option[Long] val putResult: Seq[(BlockId, BlockStatus)] = block match { case ArrayBufferBlock(arrayBuffer) => numRecords = Some(arrayBuffer.size.toLong) blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel, tellMaster = true) case IteratorBlock(iterator) => val countIterator = new CountingIterator(iterator) val putResult = blockManager.putIterator(blockId, countIterator, storageLevel, tellMaster = true) numRecords = countIterator.count putResult case ByteBufferBlock(byteBuffer) => blockManager.putBytes(blockId, byteBuffer, storageLevel, tellMaster = true) case o => throw new SparkException( s"Could not store $blockId to block manager, unexpected block type ${o.getClass.getName}") } if (!putResult.map { _._1 }.contains(blockId)) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") } BlockManagerBasedStoreResult(blockId, numRecords) } def cleanupOldBlocks(threshTime: Long) { // this is not used as blocks inserted into the BlockManager are cleared by DStream's clearing // of BlockRDDs. } }
6. 具体实现是经过putIterator。
def putIterator( blockId: BlockId, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean = true, effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = { require(values != null, "Values is null") doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel) }
7. doPut源码以下:
// If we're storing bytes, then initiate the replication before storing them locally. // This is faster as data is already serialized and ready to send. val replicationFuture = data match { case b: ByteBufferValues if putLevel.replication > 1 => // Duplicate doesn't copy the bytes, but just creates a wrapper val bufferView = b.buffer.duplicate() Future { // This is a blocking action and should run in futureExecutionContext which is a cached // thread pool} //经过replicate将数据备份到其余节点上。 replicate(blockId, bufferView, putLevel) }(futureExecutionContext) case _ => null }
8. replicate源码以下:把数据备份到另外一个节点。
/** * Replicate block to another node. Not that this is a blocking call that returns after * the block has been replicated. */ private def replicate(blockId: BlockId, data: ByteBuffer, level: StorageLevel): Unit = { val maxReplicationFailures = conf.getInt("spark.storage.maxReplicationFailures", 1) val numPeersToReplicateTo = level.replication - 1 val peersForReplication = new ArrayBuffer[BlockManagerId] val peersReplicatedTo = new ArrayBuffer[BlockManagerId] val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId] val tLevel = StorageLevel( level.useDisk, level.useMemory, level.useOffHeap, level.deserialized, 1) val startTime = System.currentTimeMillis val random = new Random(blockId.hashCode)
WAL方式
1. 干其余事情以前写入log日志中。将此日志写入目录下,也就是checkpoint目录下。若是做业失败的话,能够基于此日志进行恢复。安全
private val receivedBlockHandler: ReceivedBlockHandler = { if (WriteAheadLogUtils.enableReceiverLog(env.conf)) { if (checkpointDirOption.isEmpty) { throw new SparkException( "Cannot enable receiver write-ahead log without checkpoint directory set. " + "Please use streamingContext.checkpoint() to set the checkpoint directory. " + "See documentation for more details.") } //由于可能有好几个receiver,因此这里须要streamId. new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId, receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) } else { //而BlockManager是基于RDD容错的,因此就不须要了。 new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) } }
2. ReceivedBlockHandler源码以下:实现了ReceiverBlockHandler
/** * Implementation of a [[org.apache.spark.streaming.receiver.ReceivedBlockHandler]] which * stores the received blocks in both, a write ahead log and a block manager. */ private[streaming] class WriteAheadLogBasedBlockHandler( blockManager: BlockManager, streamId: Int, storageLevel: StorageLevel, conf: SparkConf, hadoopConf: Configuration, checkpointDir: String, clock: Clock = new SystemClock ) extends ReceivedBlockHandler with Logging {
3. 使用WAL,就不必将replication变成2份。WAL是写到checkpoint目录中,而checkpoint是保持在HDFS中,HDFS默认是3份副本。
private val effectiveStorageLevel = {
if (storageLevel.deserialized) { logWarning(s"Storage level serialization ${storageLevel.deserialized} is not supported when" + s" write ahead log is enabled, change to serialization false") } if (storageLevel.replication > 1) { logWarning(s"Storage level replication ${storageLevel.replication} is unnecessary when " + s"write ahead log is enabled, change to replication 1") }
4. 存储数据的时候是同时往WAL和BlockManager中放数据。
/** * This implementation stores the block into the block manager as well as a write ahead log. * It does this in parallel, using Scala Futures, and returns only after the block has * been stored in both places. */ def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { var numRecords = None: Option[Long] // Serialize the block so that it can be inserted into both val serializedBlock = block match { case ArrayBufferBlock(arrayBuffer) => numRecords = Some(arrayBuffer.size.toLong) blockManager.dataSerialize(blockId, arrayBuffer.iterator) case IteratorBlock(iterator) => val countIterator = new CountingIterator(iterator) val serializedBlock = blockManager.dataSerialize(blockId, countIterator) numRecords = countIterator.count serializedBlock case ByteBufferBlock(byteBuffer) => byteBuffer case _ => throw new Exception(s"Could not push $blockId to block manager, unexpected block type") }
5. 而后将数据存储到BlockManager中。
// Store the block in block manager val storeInBlockManagerFuture = Future { val putResult = blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true) if (!putResult.map { _._1 }.contains(blockId)) { throw new SparkException( s"Could not store $blockId to block manager with storage level $storageLevel") } }
6. 使用write方法写入到log中
// Store the block in write ahead log val storeInWriteAheadLogFuture = Future { //block自己要可序列化。 writeAheadLog.write(serializedBlock, clock.getTimeMillis()) }
7. WAL写数据的时候是顺序写,数据不可修改,因此读的时候只须要按照指针(也就是要读的record在那,长度是多少)读便可。因此WAL的速度很是快。
/** * :: DeveloperApi :: * * This abstract class represents a write ahead log (aka journal) that is used by Spark Streaming * to save the received data (by receivers) and associated metadata to a reliable storage, so that * they can be recovered after driver failures. See the Spark documentation for more information * on how to plug in your own custom implementation of a write ahead log. */ @org.apache.spark.annotation.DeveloperApi public abstract class WriteAheadLog { Record handle包含了全部的读和写所必要信息,时间做为索引 /** * Write the record to the log and return a record handle, which contains all the information * necessary to read back the written record. The time is used to the index the record, * such that it can be cleaned later. Note that implementations of this abstract class must * ensure that the written data is durable and readable (using the record handle) by the * time this function returns. */ // WriteAheadLogRecordHandle使用该句柄读取数据 abstract public WriteAheadLogRecordHandle write(ByteBuffer record, long time); /** * Read a written record based on the given record handle. */ abstract public ByteBuffer read(WriteAheadLogRecordHandle handle); /** * Read and return an iterator of all the records that have been written but not yet cleaned up. */ abstract public Iterator<ByteBuffer> readAll(); /** * Clean all the records that are older than the threshold time. It can wait for * the completion of the deletion. */ //清除过期的目录 abstract public void clean(long threshTime, boolean waitForCompletion); /** * Close this log and release any resources. */ abstract public void close(); }
8. WriteAheadLogRecordHandle的实现是FileBasedWriteAheadLogSegment.
9. Path: 在哪一个目录下,offset:索引,length:长度,基于此就能够索引到数据的位置。bash
/** Class for representing a segment of data in a write ahead log file */ private[streaming] case class FileBasedWriteAheadLogSegment(path: String, offset: Long, length: Int) extends WriteAheadLogRecordHandle
10. WriteAheadLog的实现以下:
11. FileBasedWriteAheadLog管理WAL文件。app
/**
* This class manages write ahead log files.
*
* - Writes records (bytebuffers) to periodically rotating log files.
* - Recovers the log files and the reads the recovered records upon failures. * - Cleans up old log files. * * Uses [[org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter]] to write * and [[org.apache.spark.streaming.util.FileBasedWriteAheadLogReader]] to read. * * @param logDirectory Directory when rotating log files will be created. * @param hadoopConf Hadoop configuration for reading/writing log files. */ private[streaming] class FileBasedWriteAheadLog(
12. 直接将数据写入到HDFS的checkpoint
/** * Write a byte buffer to the log file. This method synchronously writes the data in the * ByteBuffer to HDFS. When this method returns, the data is guaranteed to have been flushed * to HDFS, and will be available for readers to read. */ def write(byteBuffer: ByteBuffer, time: Long): FileBasedWriteAheadLogSegment = synchronized { var fileSegment: FileBasedWriteAheadLogSegment = null var failures = 0 var lastException: Exception = null var succeeded = false while (!succeeded && failures < maxFailures) { try { // getLogWriter得到Writer fileSegment = getLogWriter(time).write(byteBuffer) if (closeFileAfterWrite) { resetWriter() } succeeded = true } catch { case ex: Exception => lastException = ex logWarning("Failed to write to write ahead log") resetWriter() failures += 1 } } if (fileSegment == null) { logError(s"Failed to write to write ahead log after $failures failures") throw lastException } fileSegment }
13. 不一样时间不一样条件下,会写入到不一样的文件中,会有不少小文件。
/** Get the current log writer while taking care of rotation */ private def getLogWriter(currentTime: Long): FileBasedWriteAheadLogWriter = synchronized { if (currentLogWriter == null || currentTime > currentLogWriterStopTime) { resetWriter() currentLogPath.foreach { pastLogs += LogInfo(currentLogWriterStartTime, currentLogWriterStopTime, _) } currentLogWriterStartTime = currentTime currentLogWriterStopTime = currentTime + (rollingIntervalSecs * 1000) val newLogPath = new Path(logDirectory, timeToLogFile(currentLogWriterStartTime, currentLogWriterStopTime)) currentLogPath = Some(newLogPath.toString) currentLogWriter = new FileBasedWriteAheadLogWriter(currentLogPath.get, hadoopConf) } currentLogWriter }
14. Read部分
/** * A random access reader for reading write ahead log files written using * [[org.apache.spark.streaming.util.FileBasedWriteAheadLogWriter]]. Given the file segment info, * this reads the record (ByteBuffer) from the log file. */ private[streaming] class FileBasedWriteAheadLogRandomReader(path: String, conf: Configuration) extends Closeable { private val instream = HdfsUtils.getInputStream(path, conf) private var closed = (instream == null) // the file may be deleted as we're opening the stream def read(segment: FileBasedWriteAheadLogSegment): ByteBuffer = synchronized { //先找到指针索引 assertOpen() instream.seek(segment.offset) val nextLength = instream.readInt() HdfsUtils.checkState(nextLength == segment.length, s"Expected message length to be ${segment.length}, but was $nextLength") val buffer = new Array[Byte](nextLength) instream.readFully(buffer) ByteBuffer.wrap(buffer) }
支持数据存放。在实际的开发中直接使用Kafka,由于不须要容错,也不须要副本。
Kafka有Receiver方式和Direct方式
Receiver方式:是交给Zookeeper去管理数据的,也就是偏移量offSet.若是失效后,Kafka会基于offSet从新读取,由于处理数据的时候中途崩溃,不会给Zookeeper发送ACK,此时Zookeeper认为你并无消息这个数据。可是在实际中越来用的越多的是Direct的方式直接操做offSet.并且仍是本身管理offSet.dom
protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = { val o = kc.getLatestLeaderOffsets(currentOffsets.keySet) // Either.fold would confuse @tailrec, do it manually if (o.isLeft) { val err = o.left.get.toString if (retries <= 0) { throw new SparkException(err) } else { log.error(err) Thread.sleep(kc.config.refreshLeaderBackoffMs) latestLeaderOffsets(retries - 1) } } else { o.right.get } }
本课程笔记来源于: socket