使用分布式receiver来获取数据
使用 WAL 来实现 At least once 操做:
conf.set("spark.streaming.receiver.writeAheadLog.enable","true") // 开启 WAL
// 一、At most once - 每条数据最多被处理一次(0次或1次),这种语义下会出现数据丢失的问题;
// 二、At least once - 每条数据最少被处理一次 (1次或更多),这个不会出现数据丢失,可是会出现数据重复;
// 三、Exactly once - 每条数据只会被处理一次,没有数据会丢失,而且没有数据会被屡次处理,这种语义是你们最想要的,可是也是最难实现的。html
若是不作容错,将会带来数据丢失,由于Receiver一直在接收数据,在其没有处理的时候(已通知zk数据接收到),Executor忽然挂掉(或是driver挂掉通知executor关闭),缓存在内存中的数据就会丢失。由于这个问题,Spark1.2开始加入了WAL(Write ahead log)开启 WAL,将receiver获取数据的存储级别修改成StorageLevel. MEMORY_AND_DISK_SER_2java
1 // 缺点,不能本身维护消费 topic partition 的 offset 2 // 优势,开启 WAL,来确保 exactly-once 语义 3 val stream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream[String,String,StringDecoder,StringDecoder]( 4 ssc,kafkaParams,map,StorageLevel.MEMORY_AND_DISK_SER_2)
org.apache.spark.streaming.StreamingContext#start中启动了 JobScheduler实例node
1 // private[streaming] val scheduler = new JobScheduler(this) 2 3 // Start the streaming scheduler in a new thread, so that thread local properties 4 // like call sites and job groups can be reset without affecting those of the 5 // current thread. 6 ThreadUtils.runInNewThread("streaming-start") { // 单独的一个daemon线程运行函数题 7 sparkContext.setCallSite(startSite.get) 8 sparkContext.clearJobGroup() 9 sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") 10 // 执行start 方法 11 scheduler.start() 12 } 13 state = StreamingContextState.ACTIVE
org.apache.spark.streaming.scheduler.JobScheduler#start 源码以下:apache
1 def start(): Unit = synchronized { 2 if (eventLoop != null) return // scheduler has already been started 3 4 logDebug("Starting JobScheduler") 5 eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { 6 override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) 7 8 override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) 9 } 10 eventLoop.start() 11 12 // attach rate controllers of input streams to receive batch completion updates 13 for { 14 inputDStream <- ssc.graph.getInputStreams 15 rateController <- inputDStream.rateController 16 } ssc.addStreamingListener(rateController) 17 18 listenerBus.start(ssc.sparkContext) 19 receiverTracker = new ReceiverTracker(ssc) 20 inputInfoTracker = new InputInfoTracker(ssc) 21 receiverTracker.start() 22 jobGenerator.start() 23 logInfo("Started JobScheduler") 24 }
ReceiverTracker 的类声明以下:api
1 This class manages the execution of the receivers of ReceiverInputDStreams. Instance of this class must be created after all input streams have been added and StreamingContext.start() has been called because it needs the final set of input streams at the time of instantiation. 2 此类负责执行ReceiverInputDStreams的receiver。必须在添加全部输入流并调用StreamingContext.start()以后建立此类的实例,由于它在实例化时须要最终的输入流集。
其 start 方法以下:数组
1 /** Start the endpoint and receiver execution thread. */ 2 def start(): Unit = synchronized { 3 if (isTrackerStarted) { 4 throw new SparkException("ReceiverTracker already started") 5 } 6 7 if (!receiverInputStreams.isEmpty) { 8 // 创建rpc endpoint 9 endpoint = ssc.env.rpcEnv.setupEndpoint( // 注意,这是一个driver的 endpoint 10 "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) 11 // driver节点上发送启动 receiver 命令 12 if (!skipReceiverLaunch) launchReceivers() 13 logInfo("ReceiverTracker started") 14 trackerState = Started 15 } 16 } 17 18 /** 19 * Get the receivers from the ReceiverInputDStreams, distributes them to the 20 * worker nodes as a parallel collection, and runs them. 21 */ 22 // 从ReceiverInputDStreams 获取到 receivers,而后将它们分配到不一样的 worker 节点并运行它们。 23 private def launchReceivers(): Unit = { 24 val receivers = receiverInputStreams.map(nis => { 25 // 未启用WAL 是KafkaReceiver,启动WAL后是ReliableKafkaReceiver 26 val rcvr = nis.getReceiver() 27 rcvr.setReceiverId(nis.id) 28 rcvr 29 }) 30 // 运行一个简单的应用来确保全部的salve node都已经启动起来,避免全部的 receiver 任务都在同一个local node上 31 runDummySparkJob() 32 33 logInfo("Starting " + receivers.length + " receivers") 34 endpoint.send(StartAllReceivers(receivers)) // 发送请求driver 转发 启动 receiver 的命令 35 }
Driver 端StartAllReceivers 的处理代码以下:缓存
1 override def receive: PartialFunction[Any, Unit] = { 2 // Local messages 3 case StartAllReceivers(receivers) => 4 // schduleReceiver 5 val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) 6 for (receiver <- receivers) { 7 val executors = scheduledLocations(receiver.streamId) 8 updateReceiverScheduledExecutors(receiver.streamId, executors) 9 receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation 10 startReceiver(receiver, executors) 11 } 12 …… 13 }
getExecutors源码以下:session
1 /** 2 * Get the list of executors excluding driver 3 */ 4 // 若是是 local 模式,返回 本地线程; 若是是 yarn 模式,返回 非driver 节点上的 excutors 5 private def getExecutors: Seq[ExecutorCacheTaskLocation] = { 6 if (ssc.sc.isLocal) { // 若是在 local 模式下运行 7 val blockManagerId = ssc.sparkContext.env.blockManager.blockManagerId 8 Seq(ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId)) 9 } else { // 在 yarn 模式下,过滤掉 driver 的 executor 10 ssc.sparkContext.env.blockManager.master.getMemoryStatus.filter { case (blockManagerId, _) => 11 blockManagerId.executorId != SparkContext.DRIVER_IDENTIFIER // Ignore the driver location 12 }.map { case (blockManagerId, _) => 13 ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId) 14 }.toSeq 15 } 16 }
org.apache.spark.streaming.scheduler.ReceiverSchedulingPolicy#scheduleReceivers的解释以下:app
1 Try our best to schedule receivers with evenly distributed. However, if the preferredLocations of receivers are not even, we may not be able to schedule them evenly because we have to respect them. Here is the approach to schedule executors: 2 First, schedule all the receivers with preferred locations (hosts), evenly among the executors running on those host. 3 Then, schedule all other receivers evenly among all the executors such that overall distribution over all the receivers is even. 4 This method is called when we start to launch receivers at the first time. 5 该方法就是确保receiver 可以在worker node 上均匀分布的。遵循如下两个原则: 6 1.使用 preferred location 分配 receiver 到这些node 上 7 2.将其余的未分配的receiver均匀分布均匀分布到 每个 worker node 上
org.apache.spark.streaming.scheduler.ReceiverTracker#updateReceiverScheduledExecutors 负责更新receiverid 和 receiver info 的映射关系,源码以下:负载均衡
1 private def updateReceiverScheduledExecutors( 2 receiverId: Int, scheduledLocations: Seq[TaskLocation]): Unit = { 3 val newReceiverTrackingInfo = receiverTrackingInfos.get(receiverId) match { 4 case Some(oldInfo) => 5 oldInfo.copy(state = ReceiverState.SCHEDULED, 6 scheduledLocations = Some(scheduledLocations)) 7 case None => 8 ReceiverTrackingInfo( 9 receiverId, 10 ReceiverState.SCHEDULED, 11 Some(scheduledLocations), 12 runningExecutor = None) 13 } 14 receiverTrackingInfos.put(receiverId, newReceiverTrackingInfo) 15 }
startReceiver 负责启动 receiver,源码以下:
1 /** 2 * Start a receiver along with its scheduled executors 3 */ 4 private def startReceiver( 5 receiver: Receiver[_], 6 scheduledLocations: Seq[TaskLocation]): Unit = { 7 def shouldStartReceiver: Boolean = { 8 // It's okay to start when trackerState is Initialized or Started 9 !(isTrackerStopping || isTrackerStopped) 10 } 11 12 val receiverId = receiver.streamId 13 if (!shouldStartReceiver) { 14 onReceiverJobFinish(receiverId) 15 return 16 } 17 18 val checkpointDirOption = Option(ssc.checkpointDir) 19 val serializableHadoopConf = 20 new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration) 21 22 // 在 worker node 上启动 receiver 的方法 23 val startReceiverFunc: Iterator[Receiver[_]] => Unit = 24 (iterator: Iterator[Receiver[_]]) => { 25 if (!iterator.hasNext) { 26 throw new SparkException( 27 "Could not start receiver as object not found.") 28 } 29 if (TaskContext.get().attemptNumber() == 0) { 30 val receiver = iterator.next() 31 assert(iterator.hasNext == false) 32 val supervisor = new ReceiverSupervisorImpl( 33 receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) 34 supervisor.start() 35 supervisor.awaitTermination() 36 } else { 37 // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it. 38 } 39 } 40 41 // Create the RDD using the scheduledLocations to run the receiver in a Spark job 42 val receiverRDD: RDD[Receiver[_]] = 43 if (scheduledLocations.isEmpty) { 44 ssc.sc.makeRDD(Seq(receiver), 1) 45 } else { 46 val preferredLocations = scheduledLocations.map(_.toString).distinct 47 ssc.sc.makeRDD(Seq(receiver -> preferredLocations)) 48 } 49 receiverRDD.setName(s"Receiver $receiverId") 50 ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId") 51 ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite())) 52 // 提交分布式receiver 启动任务 53 val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit]( 54 receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ()) 55 // We will keep restarting the receiver job until ReceiverTracker is stopped 56 future.onComplete { 57 case Success(_) => 58 if (!shouldStartReceiver) { 59 onReceiverJobFinish(receiverId) 60 } else { 61 logInfo(s"Restarting Receiver $receiverId") 62 self.send(RestartReceiver(receiver)) 63 } 64 case Failure(e) => 65 if (!shouldStartReceiver) { 66 onReceiverJobFinish(receiverId) 67 } else { 68 logError("Receiver has been stopped. Try to restart it.", e) 69 logInfo(s"Restarting Receiver $receiverId") 70 self.send(RestartReceiver(receiver)) 71 } 72 }(submitJobThreadPool) 73 logInfo(s"Receiver ${receiver.streamId} started") 74 }
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl#ReceiverSupervisorImpl 的 start 方法以下:
1 /** Start the supervisor */ 2 def start() { 3 onStart() 4 startReceiver() 5 } 6 override protected def onStart() { // 启动 BlockGenerator 服务 7 registeredBlockGenerators.foreach { _.start() } 8 } 9 // startReceiver 方法以下: 10 /** Start receiver */ 11 def startReceiver(): Unit = synchronized { 12 try { 13 if (onReceiverStart()) { // 注册receiver 成功 14 logInfo("Starting receiver") 15 receiverState = Started 16 receiver.onStart() // 启动 receiver 17 logInfo("Called receiver onStart") 18 } else { 19 // The driver refused us 20 stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None) 21 } 22 } catch { 23 case NonFatal(t) => 24 stop("Error starting receiver " + streamId, Some(t)) 25 } 26 }
1 override protected def onReceiverStart(): Boolean = { 2 val msg = RegisterReceiver( 3 streamId, receiver.getClass.getSimpleName, host, executorId, endpoint) 4 trackerEndpoint.askWithRetry[Boolean](msg) 5 }
简单描述一下driver 端作的事情,主要负责将其归入到org.apache.spark.streaming.scheduler.ReceiverTracker 的管理中来,具体streamid 和 ReceiverTrackingInfo 的映射关系保存在receiverTrackingInfos中。
org.apache.spark.streaming.scheduler.ReceiverTracker#registerReceiver关键代码以下:
1 val name = s"${typ}-${streamId}" 2 val receiverTrackingInfo = ReceiverTrackingInfo( 3 streamId, 4 ReceiverState.ACTIVE, 5 scheduledLocations = None, 6 runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)), 7 name = Some(name), 8 endpoint = Some(receiverEndpoint)) 9 receiverTrackingInfos.put(streamId, receiverTrackingInfo) 10 listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))
因为咱们启用了 WAL, 因此 这里的receiver 是ReliableKafkaReceiver 的实例
receiver.onStart 即 org.apache.spark.streaming.kafka.ReliableKafkaReceiver#onStart, 源码以下:
1 override def onStart(): Unit = { 2 logInfo(s"Starting Kafka Consumer Stream with group: $groupId") 3 4 // Initialize the topic-partition / offset hash map. 5 // 1. 负责维护消费的 topic-partition 和 offset 的映射关系 6 topicPartitionOffsetMap = new mutable.HashMap[TopicAndPartition, Long] 7 8 // Initialize the stream block id / offset snapshot hash map. 9 // 2. 负责维护 block-id 和 partition-offset 之间的映射关系 10 blockOffsetMap = new ConcurrentHashMap[StreamBlockId, Map[TopicAndPartition, Long]]() 11 12 // Initialize the block generator for storing Kafka message. 13 // 3. 负责保存 kafka message 的 block generator,入参是GeneratedBlockHandler 实例,这是一个负责监听 block generator事件的一个监听器 14 // Generates batches of objects received by a org.apache.spark.streaming.receiver.Receiver and puts them into appropriately named blocks at regular intervals. This class starts two threads, one to periodically start a new batch and prepare the previous batch of as a block, the other to push the blocks into the block manager. 15 blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler) 16 // 4. 关闭consumer 自动提交 offset 选项 17 // auto_offset_commit 应该是 false 18 if (kafkaParams.contains(AUTO_OFFSET_COMMIT) && kafkaParams(AUTO_OFFSET_COMMIT) == "true") { 19 logWarning(s"$AUTO_OFFSET_COMMIT should be set to false in ReliableKafkaReceiver, " + 20 "otherwise we will manually set it to false to turn off auto offset commit in Kafka") 21 } 22 23 val props = new Properties() 24 kafkaParams.foreach(param => props.put(param._1, param._2)) 25 // Manually set "auto.commit.enable" to "false" no matter user explicitly set it to true, 26 // we have to make sure this property is set to false to turn off auto commit mechanism in Kafka. 27 props.setProperty(AUTO_OFFSET_COMMIT, "false") 28 29 val consumerConfig = new ConsumerConfig(props) 30 31 assert(!consumerConfig.autoCommitEnable) 32 33 logInfo(s"Connecting to Zookeeper: ${consumerConfig.zkConnect}") 34 // 5. 初始化 consumer 对象 35 // consumerConnector 是ZookeeperConsumerConnector的实例 36 consumerConnector = Consumer.create(consumerConfig) 37 logInfo(s"Connected to Zookeeper: ${consumerConfig.zkConnect}") 38 // 6. 初始化zookeeper 的客户端 39 zkClient = new ZkClient(consumerConfig.zkConnect, consumerConfig.zkSessionTimeoutMs, 40 consumerConfig.zkConnectionTimeoutMs, ZKStringSerializer) 41 // 7. 建立线程池来处理消息流,池的大小是固定的,为partition 的总数,并指定线程池中每个线程的name 的前缀,内部使用ThreadPoolExecutor,而且 建立线程的 factory类是guava 工具包提供的。 42 messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool( 43 topics.values.sum, "KafkaMessageHandler") 44 // 8. 启动 BlockGenerator内的两个线程 45 blockGenerator.start() 46 47 // 9. 建立MessageStream对象 48 val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) 49 .newInstance(consumerConfig.props) 50 .asInstanceOf[Decoder[K]] 51 52 val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) 53 .newInstance(consumerConfig.props) 54 .asInstanceOf[Decoder[V]] 55 56 val topicMessageStreams = consumerConnector.createMessageStreams( 57 topics, keyDecoder, valueDecoder) 58 // 10. 将待处理的MessageHandler 放入 线程池中,等待执行 59 topicMessageStreams.values.foreach { streams => 60 streams.foreach { stream => 61 messageHandlerThreadPool.submit(new MessageHandler(stream)) 62 } 63 } 64 }
其中, 第9 步,建立MessageStream对象,
kafka.consumer.ZookeeperConsumerConnector#createMessageStreams 方法以下:
1 def createMessageStreams[K,V](topicCountMap: Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]) 2 : Map[String, List[KafkaStream[K,V]]] = { 3 if (messageStreamCreated.getAndSet(true)) 4 throw new MessageStreamsExistException(this.getClass.getSimpleName + 5 " can create message streams at most once",null) 6 consume(topicCountMap, keyDecoder, valueDecoder) 7 }
其调用了 consume 方法,源码以下:
def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]) : Map[String,List[KafkaStream[K,V]]] = { debug("entering consume ") if (topicCountMap == null) throw new RuntimeException("topicCountMap is null") // 1. 初始化 topicCount val topicCount = TopicCount.constructTopicCount(consumerIdString, topicCountMap) // 2. 获取 每个topic 和 threadId 集合的映射关系 val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic // make a list of (queue,stream) pairs, one pair for each threadId // 3. 获得每个 threadId 对应 (queue, stream) 的映射列表 val queuesAndStreams = topicThreadIds.values.map(threadIdSet => threadIdSet.map(_ => { val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) val stream = new KafkaStream[K,V]( queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId) (queue, stream) }) ).flatten.toList // 4. 获取 groupId 在 zookeeper 中的path val dirs = new ZKGroupDirs(config.groupId) // 5. 注册 consumer 到 groupId(在zk中) registerConsumerInZK(dirs, consumerIdString, topicCount) // 6. 从新初始化 consumer reinitializeConsumer(topicCount, queuesAndStreams) // 7. 返回流 loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]] }
在 kafka.consumer.ZookeeperConsumerConnector#consume方法中,有以下操做:
1 // 获得每个 threadId 对应 (queue, stream) 的映射列表 2 val queuesAndStreams = topicThreadIds.values.map(threadIdSet => 3 threadIdSet.map(_ => { 4 val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) 5 val stream = new KafkaStream[K,V]( 6 queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId) 7 (queue, stream) 8 }) 9 ).flatten.toList 10 // 获取 groupId 在 zookeeper 中的path 11 val dirs = new ZKGroupDirs(config.groupId) 12 // 注册 consumer 到 groupId(在zk中) 13 registerConsumerInZK(dirs, consumerIdString, topicCount) 14 // 从新初始化 consumer 15 reinitializeConsumer(topicCount, queuesAndStreams)
在上面的代码中,能够看到初始化的queue(LinkedBlockingQueue实例)除了被传入stream(KafkaStream)的构造函数被迭代器从中取数据,还和 stream 重组成Tuple2[LinkedBlockingQueue[FetchedDataChunk]的list,以后被传入reinitializeConsumer 方法中。
kafka.consumer.ZookeeperConsumerConnector#reinitializeConsume 其源码以下:
1 private def reinitializeConsumer[K,V]( 2 topicCount: TopicCount, 3 queuesAndStreams: List[(LinkedBlockingQueue[FetchedDataChunk],KafkaStream[K,V])]) { 4 // 1. 获取 该groupid 在 zk 中的路径 5 val dirs = new ZKGroupDirs(config.groupId) 6 7 // listener to consumer and partition changes 8 // 2. 初始化loadBalancerListener,这个负载均衡listener 会时刻监控 consumer 和 partition 的变化 9 if (loadBalancerListener == null) { 10 val topicStreamsMap = new mutable.HashMap[String,List[KafkaStream[K,V]]] 11 loadBalancerListener = new ZKRebalancerListener( 12 config.groupId, consumerIdString, topicStreamsMap.asInstanceOf[scala.collection.mutable.Map[String, List[KafkaStream[_,_]]]]) 13 } 14 15 // create listener for session expired event if not exist yet 16 // 3. 监控 session 过时的listner, 有新session注册初始化,会通知 loadBalancer 17 if (sessionExpirationListener == null) 18 sessionExpirationListener = new ZKSessionExpireListener( 19 dirs, consumerIdString, topicCount, loadBalancerListener) 20 21 // create listener for topic partition change event if not exist yet 22 // 4. 初始化ZKTopicPartitionChangeListener实例,当topic partition 变化时,这个listener会通知 loadBalancer 23 if (topicPartitionChangeListener == null) 24 topicPartitionChangeListener = new ZKTopicPartitionChangeListener(loadBalancerListener) 25 // 5. 将queuesAndStreams 的值通过一系列转换,并添加到loadBalancerListener.kafkaMessageAndMetadataStreams 中 26 val topicStreamsMap = loadBalancerListener.kafkaMessageAndMetadataStreams 27 28 // map of {topic -> Set(thread-1, thread-2, ...)} 29 val consumerThreadIdsPerTopic: Map[String, Set[ConsumerThreadId]] = 30 topicCount.getConsumerThreadIdsPerTopic 31 32 val allQueuesAndStreams = topicCount match { 33 case wildTopicCount: WildcardTopicCount => // 这里是WildcardTopicCount,走这个分支 34 /* 35 * Wild-card consumption streams share the same queues, so we need to 36 * duplicate the list for the subsequent zip operation. 37 */ 38 (1 to consumerThreadIdsPerTopic.keySet.size).flatMap(_ => queuesAndStreams).toList 39 case statTopicCount: StaticTopicCount => 40 queuesAndStreams 41 } 42 43 val topicThreadIds = consumerThreadIdsPerTopic.map { 44 case(topic, threadIds) => 45 threadIds.map((topic, _)) 46 }.flatten 47 48 require(topicThreadIds.size == allQueuesAndStreams.size, 49 "Mismatch between thread ID count (%d) and queue count (%d)" 50 .format(topicThreadIds.size, allQueuesAndStreams.size)) 51 val threadQueueStreamPairs = topicThreadIds.zip(allQueuesAndStreams) 52 53 threadQueueStreamPairs.foreach(e => { 54 val topicThreadId = e._1 55 val q = e._2._1 56 topicThreadIdAndQueues.put(topicThreadId, q) 57 debug("Adding topicThreadId %s and queue %s to topicThreadIdAndQueues data structure".format(topicThreadId, q.toString)) 58 newGauge( 59 "FetchQueueSize", 60 new Gauge[Int] { 61 def value = q.size 62 }, 63 Map("clientId" -> config.clientId, 64 "topic" -> topicThreadId._1, 65 "threadId" -> topicThreadId._2.threadId.toString) 66 ) 67 }) 68 69 val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1) 70 groupedByTopic.foreach(e => { 71 val topic = e._1 72 val streams = e._2.map(_._2._2).toList 73 topicStreamsMap += (topic -> streams) 74 debug("adding topic %s and %d streams to map.".format(topic, streams.size)) 75 }) 76 77 // listener to consumer and partition changes 78 // 6. 使用 zkClient 注册sessionExpirationListener 实例 79 zkClient.subscribeStateChanges(sessionExpirationListener) 80 // 7. 使用 zkClient 注册loadBalancerListener 实例 81 zkClient.subscribeChildChanges(dirs.consumerRegistryDir, loadBalancerListener) 82 // 遍历每个topic,使用zkClient 注册topicPartitionChangeListener 实例 83 topicStreamsMap.foreach { topicAndStreams => 84 // register on broker partition path changes 85 val topicPath = BrokerTopicsPath + "/" + topicAndStreams._1 86 zkClient.subscribeDataChanges(topicPath, topicPartitionChangeListener) 87 } 88 89 // explicitly trigger load balancing for this consumer 90 // 8. 使用 loadBalancerListener 同步作负载均衡 91 loadBalancerListener.syncedRebalance() 92 }
重点看 第 8 步,使用 loadBalancerListener 同步作负载均衡。
kafka.consumer.ZookeeperConsumerConnector.ZKRebalancerListener#syncedRebalance 源码以下:
1 def syncedRebalance() { 2 rebalanceLock synchronized { 3 rebalanceTimer.time { 4 if(isShuttingDown.get()) { // 若是ZookeeperConsumerConnector 5 已经shutdown了,直接返回 6 return 7 } else { 8 for (i <- 0 until config.rebalanceMaxRetries) { // 默认是 4 次 9 info("begin rebalancing consumer " + consumerIdString + " try #" + i) 10 var done = false 11 var cluster: Cluster = null 12 try { 13 // 1. 根据zkClient 实例 获取并建立Cluster 对象,这个 cluster 实例包含了一个 Broker(broker的id,broker在zk中的路径) 列表 14 cluster = getCluster(zkClient) 15 // 2. 在cluster中作 rebalance操做 16 done = rebalance(cluster) 17 } catch { 18 case e: Throwable => 19 /** occasionally, we may hit a ZK exception because the ZK state is changing while we are iterating. 20 * For example, a ZK node can disappear between the time we get all children and the time we try to get 21 * the value of a child. Just let this go since another rebalance will be triggered. 22 **/ 23 info("exception during rebalance ", e) 24 } 25 info("end rebalancing consumer " + consumerIdString + " try #" + i) 26 if (done) { 27 return 28 } else { 29 /* Here the cache is at a risk of being stale. To take future rebalancing decisions correctly, we should 30 * clear the cache */ 31 info("Rebalancing attempt failed. Clearing the cache before the next rebalancing operation is triggered") 32 } 33 // stop all fetchers and clear all the queues to avoid data duplication 34 closeFetchersForQueues(cluster, kafkaMessageAndMetadataStreams, topicThreadIdAndQueues.map(q => q._2)) 35 Thread.sleep(config.rebalanceBackoffMs) 36 } 37 } 38 } 39 } 40 41 throw new ConsumerRebalanceFailedException(consumerIdString + " can't rebalance after " + config.rebalanceMaxRetries +" retries") 42 }
重点看 第2 步,在 cluster 中作 rebalance 操做,kafka.consumer.ZookeeperConsumerConnector.ZKRebalancerListener#rebalance 源码以下:
1 private def rebalance(cluster: Cluster): Boolean = { 2 // 1. 获取 group和 threadId 的Map 映射关系 3 val myTopicThreadIdsMap = TopicCount.constructTopicCount( 4 group, consumerIdString, zkClient, config.excludeInternalTopics).getConsumerThreadIdsPerTopic 5 // 2. 获取kafka cluster 中全部可用的node 6 val brokers = getAllBrokersInCluster(zkClient) 7 if (brokers.size == 0) { // 若是可用节点为空,设置listener订阅,返回 true 8 // This can happen in a rare case when there are no brokers available in the cluster when the consumer is started. 9 // We log an warning and register for child changes on brokers/id so that rebalance can be triggered when the brokers 10 // are up. 11 warn("no brokers found when trying to rebalance.") 12 zkClient.subscribeChildChanges(ZkUtils.BrokerIdsPath, loadBalancerListener) 13 true 14 } 15 else { 16 /** 17 * fetchers must be stopped to avoid data duplication, since if the current 18 * rebalancing attempt fails, the partitions that are released could be owned by another consumer. 19 * But if we don't stop the fetchers first, this consumer would continue returning data for released 20 * partitions in parallel. So, not stopping the fetchers leads to duplicate data. 21 */ 22 // 3. 作rebalance 以前的准备工做 23 // 3.1. 关闭现有 fetcher 链接 24 closeFetchers(cluster, kafkaMessageAndMetadataStreams, myTopicThreadIdsMap) 25 // 3.2 释放 partition 的全部权(主要是删除zk下的owner 节点的数据以及解除内存中的topic和 fetcher的关联关系) 26 releasePartitionOwnership(topicRegistry) 27 // 3.3. 从新给partition分配 fetcher 28 val assignmentContext = new AssignmentContext(group, consumerIdString, config.excludeInternalTopics, zkClient) 29 val partitionOwnershipDecision = partitionAssignor.assign(assignmentContext) 30 val currentTopicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]( 31 valueFactory = Some((topic: String) => new Pool[Int, PartitionTopicInfo])) 32 33 // fetch current offsets for all topic-partitions 34 // 3.4 获取当前fetcher对应的 partitions 的 offsets,这里的offset是指 consumer 下一个要消费的offset 35 val topicPartitions = partitionOwnershipDecision.keySet.toSeq 36 37 val offsetFetchResponseOpt = fetchOffsets(topicPartitions) 38 39 if (isShuttingDown.get || !offsetFetchResponseOpt.isDefined) 40 false 41 else { 42 // 3.5 更新 partition 和 fetcher 的对应关系 43 val offsetFetchResponse = offsetFetchResponseOpt.get 44 topicPartitions.foreach(topicAndPartition => { 45 val (topic, partition) = topicAndPartition.asTuple 46 // requestInfo是OffsetFetchResponse实例中的成员变量,它是一个Map[TopicAndPartition, OffsetMetadataAndError]实例 47 val offset = offsetFetchResponse.requestInfo(topicAndPartition).offset 48 val threadId = partitionOwnershipDecision(topicAndPartition) 49 addPartitionTopicInfo(currentTopicRegistry, partition, topic, offset, threadId) 50 }) 51 52 /** 53 * move the partition ownership here, since that can be used to indicate a truly successful rebalancing attempt 54 * A rebalancing attempt is completed successfully only after the fetchers have been started correctly 55 */ 56 if(reflectPartitionOwnershipDecision(partitionOwnershipDecision)) { 57 allTopicsOwnedPartitionsCount = partitionOwnershipDecision.size 58 59 partitionOwnershipDecision.view.groupBy { case(topicPartition, consumerThreadId) => topicPartition.topic } 60 .foreach { case (topic, partitionThreadPairs) => 61 newGauge("OwnedPartitionsCount", 62 new Gauge[Int] { 63 def value() = partitionThreadPairs.size 64 }, 65 ownedPartitionsCountMetricTags(topic)) 66 } 67 // 3.6 将已经新的 topic registry 覆盖旧的 68 topicRegistry = currentTopicRegistry 69 // 4. 更新 fetcher 70 updateFetcher(cluster) 71 true 72 } else { 73 false 74 } 75 } 76 } 77 }
其中addPartitionTopicInfo 源码以下:
1 private def addPartitionTopicInfo(currentTopicRegistry: Pool[String, Pool[Int, PartitionTopicInfo]], 2 partition: Int, topic: String, 3 offset: Long, consumerThreadId: ConsumerThreadId) { 4 //若是map没有对应的 key,会使用valueFactory初始化键值对,并返回 对应的 value 5 val partTopicInfoMap = currentTopicRegistry.getAndMaybePut(topic) 6 7 val queue = topicThreadIdAndQueues.get((topic, consumerThreadId)) 8 val consumedOffset = new AtomicLong(offset) 9 val fetchedOffset = new AtomicLong(offset) 10 val partTopicInfo = new PartitionTopicInfo(topic, 11 partition, 12 queue, 13 consumedOffset, 14 fetchedOffset, 15 new AtomicInteger(config.fetchMessageMaxBytes), 16 config.clientId) 17 // 1. 将其注册到新的 Topic注册中心中,即注册 partition 和 fetcher 的关系 18 partTopicInfoMap.put(partition, partTopicInfo) 19 debug(partTopicInfo + " selected new offset " + offset) 20 // 2. 更新consumer 的 已经消费的offset信息 21 checkpointedZkOffsets.put(TopicAndPartition(topic, partition), offset) 22 } 23 }
第4步, 更新 fetcher 源码以下:
1 private def updateFetcher(cluster: Cluster) { 2 // update partitions for fetcher 3 var allPartitionInfos : List[PartitionTopicInfo] = Nil 4 for (partitionInfos <- topicRegistry.values) 5 for (partition <- partitionInfos.values) 6 allPartitionInfos ::= partition 7 info("Consumer " + consumerIdString + " selected partitions : " + 8 allPartitionInfos.sortWith((s,t) => s.partitionId < t.partitionId).map(_.toString).mkString(",")) 9 10 fetcher match { 11 case Some(f) => 12 f.startConnections(allPartitionInfos, cluster) 13 case None => 14 } 15 }
其中,f.startConnections方法真正执行 更新操做。此时引入一个新的类。即 fetcher 类,kafka.consumer.ConsumerFetcherManager。
kafka.consumer.ConsumerFetcherManager#startConnections 的源码以下:
1 def startConnections(topicInfos: Iterable[PartitionTopicInfo], cluster: Cluster) { 2 // LeaderFinderThread 在 topic 的leader node可用时,将 fetcher 添加到 leader 节点上 3 leaderFinderThread = new LeaderFinderThread(consumerIdString + "-leader-finder-thread") 4 leaderFinderThread.start() 5 6 inLock(lock) { 7 // 更新ConsumerFetcherManager 成员变量 8 partitionMap = topicInfos.map(tpi => (TopicAndPartition(tpi.topic, tpi.partitionId), tpi)).toMap 9 this.cluster = cluster 10 noLeaderPartitionSet ++= topicInfos.map(tpi => TopicAndPartition(tpi.topic, tpi.partitionId)) 11 cond.signalAll() 12 } 13 }
ConsumerFetcherManager 有一个LeaderFinderThread 实例,该类的父类kafka.utils.ShutdownableThread ,run 方法以下:
1 override def run(): Unit = { 2 info("Starting ") 3 try{ 4 while(isRunning.get()){ 5 doWork() 6 } 7 } catch{ 8 case e: Throwable => 9 if(isRunning.get()) 10 error("Error due to ", e) 11 } 12 shutdownLatch.countDown() 13 info("Stopped ") 14 }
doWork其实就是一个抽象方法,其子类LeaderFinderThread的实现以下:
1 // thread responsible for adding the fetcher to the right broker when leader is available 2 override def doWork() { 3 // 1. 获取 partition 和leader node的映射关系 4 val leaderForPartitionsMap = new HashMap[TopicAndPartition, Broker] 5 lock.lock() 6 try { 7 while (noLeaderPartitionSet.isEmpty) { // 这个字段在startConnections 已更新新值 8 trace("No partition for leader election.") 9 cond.await() 10 } 11 12 trace("Partitions without leader %s".format(noLeaderPartitionSet)) 13 val brokers = getAllBrokersInCluster(zkClient) // 获取全部可用broker 节点 14 // 获取kafka.api.TopicMetadata 序列,kafka.api.TopicMetadata 保存了 topic 和 partitionId,isr,leader,replicas 的信息 15 val topicsMetadata = ClientUtils.fetchTopicMetadata(noLeaderPartitionSet.map(m => m.topic).toSet, 16 brokers, 17 config.clientId, 18 config.socketTimeoutMs, 19 correlationId.getAndIncrement).topicsMetadata 20 if(logger.isDebugEnabled) topicsMetadata.foreach(topicMetadata => debug(topicMetadata.toString())) 21 // 2. 根据获取到的 partition 和 leader node 的关系更新noLeaderPartitionSet 和leaderForPartitionsMap 两个map集合,其中noLeaderPartitionSet 包含的是没有肯定leader 的 partition 集合,leaderForPartitionsMap 是 已经肯定了 leader 的 partition 集合。 22 topicsMetadata.foreach { tmd => 23 val topic = tmd.topic 24 tmd.partitionsMetadata.foreach { pmd => 25 val topicAndPartition = TopicAndPartition(topic, pmd.partitionId) 26 if(pmd.leader.isDefined && noLeaderPartitionSet.contains(topicAndPartition)) { 27 val leaderBroker = pmd.leader.get 28 leaderForPartitionsMap.put(topicAndPartition, leaderBroker) 29 noLeaderPartitionSet -= topicAndPartition 30 } 31 } 32 } 33 } catch { 34 case t: Throwable => { 35 if (!isRunning.get()) 36 throw t /* If this thread is stopped, propagate this exception to kill the thread. */ 37 else 38 warn("Failed to find leader for %s".format(noLeaderPartitionSet), t) 39 } 40 } finally { 41 lock.unlock() 42 } 43 44 try { 45 // 3. 具体为 partition 分配 fetcher 46 addFetcherForPartitions(leaderForPartitionsMap.map{ 47 case (topicAndPartition, broker) => 48 topicAndPartition -> BrokerAndInitialOffset(broker, partitionMap(topicAndPartition).getFetchOffset())} 49 ) 50 } catch { 51 case t: Throwable => { 52 if (!isRunning.get()) 53 throw t /* If this thread is stopped, propagate this exception to kill the thread. */ 54 else { 55 warn("Failed to add leader for partitions %s; will retry".format(leaderForPartitionsMap.keySet.mkString(",")), t) 56 lock.lock() 57 noLeaderPartitionSet ++= leaderForPartitionsMap.keySet 58 lock.unlock() 59 } 60 } 61 } 62 // 4. 关闭空闲fetcher线程 63 shutdownIdleFetcherThreads() 64 Thread.sleep(config.refreshLeaderBackoffMs) 65 }
重点看第3 步,具体为 partition 分配 fetcher,addFetcherForPartitions 源码以下:
1 def addFetcherForPartitions(partitionAndOffsets: Map[TopicAndPartition, BrokerAndInitialOffset]) { 2 mapLock synchronized { 3 // 获取 fetcher 和 partition的映射关系 4 val partitionsPerFetcher = partitionAndOffsets.groupBy{ case(topicAndPartition, brokerAndInitialOffset) => 5 BrokerAndFetcherId(brokerAndInitialOffset.broker, getFetcherId(topicAndPartition.topic, topicAndPartition.partition))} 6 for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) { 7 8 var fetcherThread: AbstractFetcherThread = null 9 fetcherThreadMap.get(brokerAndFetcherId) match { 10 case Some(f) => fetcherThread = f 11 case None => 12 // 根据brokerAndFetcherId 去初始化Fetcher并启动 fetcher 13 fetcherThread = createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker) 14 fetcherThreadMap.put(brokerAndFetcherId, fetcherThread) 15 fetcherThread.start 16 } 17 18 fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map { case (topicAndPartition, brokerAndInitOffset) => 19 topicAndPartition -> brokerAndInitOffset.initOffset 20 }) 21 } 22 } 23 24 info("Added fetcher for partitions %s".format(partitionAndOffsets.map{ case (topicAndPartition, brokerAndInitialOffset) => 25 "[" + topicAndPartition + ", initOffset " + brokerAndInitialOffset.initOffset + " to broker " + brokerAndInitialOffset.broker + "] "})) 26 }
kafka.consumer.ConsumerFetcherManager#createFetcherThread的源码以下:
1 override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = { 2 new ConsumerFetcherThread( 3 "ConsumerFetcherThread-%s-%d-%d".format(consumerIdString, fetcherId, sourceBroker.id), 4 config, sourceBroker, partitionMap, this) 5 }
先来看ConsumerFetcherThread的构造方法声明:
1 class ConsumerFetcherThread(name: String, 2 val config: ConsumerConfig, 3 sourceBroker: Broker, 4 partitionMap: Map[TopicAndPartition, PartitionTopicInfo], 5 val consumerFetcherManager: ConsumerFetcherManager) 6 extends AbstractFetcherThread(name = name, 7 clientId = config.clientId, 8 sourceBroker = sourceBroker, 9 socketTimeout = config.socketTimeoutMs, 10 socketBufferSize = config.socketReceiveBufferBytes, 11 fetchSize = config.fetchMessageMaxBytes, 12 fetcherBrokerId = Request.OrdinaryConsumerId, 13 maxWait = config.fetchWaitMaxMs, 14 minBytes = config.fetchMinBytes, 15 isInterruptible = true)
注意,partitionMap 中的value 是PartitionTopicInfo ,这个对象中封装了存放fetch结果值的BlockingQueue[FetchedDataChunk] 实例。
再来看 run 方法,其使用的是 kafka.utils.ShutdownableThread#run 方法,上面咱们已经看过了,主要看该子类是如何从新 doWork方法的:
1 override def doWork() { 2 inLock(partitionMapLock) { // 加锁,执行,释放锁 3 if (partitionMap.isEmpty) // 若是没有须要执行的 fetch 操做,等待200ms后返回 4 partitionMapCond.await(200L, TimeUnit.MILLISECONDS) 5 partitionMap.foreach { // 将全部的 fetch 的信息添加到fetchRequestBuilder中 6 case((topicAndPartition, offset)) => 7 fetchRequestBuilder.addFetch(topicAndPartition.topic, topicAndPartition.partition, 8 offset, fetchSize) 9 } 10 } 11 // 构建批抓取的fetchRequest对象 12 val fetchRequest = fetchRequestBuilder.build() 13 // 处理 FetchRequest 14 if (!fetchRequest.requestInfo.isEmpty) 15 processFetchRequest(fetchRequest) 16 }
其中 kafka.server.AbstractFetcherThread#processFetchRequest 源码以下:
1 private def processFetchRequest(fetchRequest: FetchRequest) { 2 val partitionsWithError = new mutable.HashSet[TopicAndPartition] 3 var response: FetchResponse = null 4 try { 5 trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) 6 // 发送请求,并获取返回值。 7 // simpleConsumer 就是SimpleConsumer 实例,已做说明,再也不赘述。 8 response = simpleConsumer.fetch(fetchRequest) 9 } catch { 10 case t: Throwable => 11 if (isRunning.get) { 12 warn("Error in fetch %s. Possible cause: %s".format(fetchRequest, t.toString)) 13 partitionMapLock synchronized { 14 partitionsWithError ++= partitionMap.keys 15 } 16 } 17 } 18 fetcherStats.requestRate.mark() 19 20 if (response != null) { 21 // process fetched data 22 inLock(partitionMapLock) { // 获取锁,执行处理response 操做,释放锁 23 response.data.foreach { 24 case(topicAndPartition, partitionData) => 25 val (topic, partitionId) = topicAndPartition.asTuple 26 val currentOffset = partitionMap.get(topicAndPartition) 27 // we append to the log if the current offset is defined and it is the same as the offset requested during fetch 28 if (currentOffset.isDefined && fetchRequest.requestInfo(topicAndPartition).offset == currentOffset.get) { 29 partitionData.error match { // 根据返回码来肯定具体执行哪部分处理逻辑 30 case ErrorMapping.NoError => // 成功返回,没有错误 31 try { 32 val messages = partitionData.messages.asInstanceOf[ByteBufferMessageSet] 33 val validBytes = messages.validBytes 34 val newOffset = messages.shallowIterator.toSeq.lastOption match { 35 case Some(m: MessageAndOffset) => m.nextOffset 36 case None => currentOffset.get 37 } 38 partitionMap.put(topicAndPartition, newOffset) 39 fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = partitionData.hw - newOffset 40 fetcherStats.byteRate.mark(validBytes) 41 // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread 42 processPartitionData(topicAndPartition, currentOffset.get, partitionData) 43 } catch { 44 case ime: InvalidMessageException => // 消息获取不完整 45 // we log the error and continue. This ensures two things 46 // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag 47 // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and 48 // should get fixed in the subsequent fetches 49 logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentOffset.get + " error " + ime.getMessage) 50 case e: Throwable => 51 throw new KafkaException("error processing data for partition [%s,%d] offset %d" 52 .format(topic, partitionId, currentOffset.get), e) 53 } 54 case ErrorMapping.OffsetOutOfRangeCode => // offset out of range error 55 try { 56 val newOffset = handleOffsetOutOfRange(topicAndPartition) 57 partitionMap.put(topicAndPartition, newOffset) 58 error("Current offset %d for partition [%s,%d] out of range; reset offset to %d" 59 .format(currentOffset.get, topic, partitionId, newOffset)) 60 } catch { 61 case e: Throwable => 62 error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) 63 partitionsWithError += topicAndPartition 64 } 65 case _ => 66 if (isRunning.get) { 67 error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, 68 ErrorMapping.exceptionFor(partitionData.error).getClass)) 69 partitionsWithError += topicAndPartition 70 } 71 } 72 } 73 } 74 } 75 } 76 77 if(partitionsWithError.size > 0) { 78 debug("handling partitions with error for %s".format(partitionsWithError)) 79 handlePartitionsWithErrors(partitionsWithError) 80 } 81 }
其中processPartitionData 源码以下,它负责处理具体的返回消息:
1 // process fetched data 2 def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: FetchResponsePartitionData) { 3 // partitionMap 是一个成员变量,在构造函数中做为入参 4 val pti = partitionMap(topicAndPartition) 5 if (pti.getFetchOffset != fetchOffset) 6 throw new RuntimeException("Offset doesn't match for partition [%s,%d] pti offset: %d fetch offset: %d" 7 .format(topicAndPartition.topic, topicAndPartition.partition, pti.getFetchOffset, fetchOffset)) 8 // 数据入队 9 pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet]) 10 }
能够看到,终于在这里,把从leader中fetch的消息放入了BlockingQueue[FetchedDataChunk] 缓冲堵塞队列中。
KafkaStream 是依赖于 LinkedBlockingQueue 的同理 KafkaStream 也会返回一个迭代器 kafka.consumer.ConsumerIterator,用于迭代访问 KafkaStream 中的数据。
kafka.consumer.ConsumerIterator 的主要源码以下:
1 // 判断是否有下一个元素 2 def hasNext(): Boolean = { 3 if(state == FAILED) 4 throw new IllegalStateException("Iterator is in failed state") 5 state match { 6 case DONE => false 7 case READY => true 8 case _ => maybeComputeNext() 9 } 10 } 11 // 获取下一个元素,父类实现 12 def next(): T = { 13 if(!hasNext()) 14 throw new NoSuchElementException() 15 state = NOT_READY 16 if(nextItem == null) 17 throw new IllegalStateException("Expected item but none found.") 18 nextItem 19 } 20 // 获取下一个元素,使用子类ConsumerIterator实现 21 override def next(): MessageAndMetadata[K, V] = { 22 val item = super.next() // 调用父类实现 23 if(consumedOffset < 0) 24 throw new KafkaException("Offset returned by the message set is invalid %d".format(consumedOffset)) 25 currentTopicInfo.resetConsumeOffset(consumedOffset) 26 val topic = currentTopicInfo.topic 27 trace("Setting %s consumed offset to %d".format(topic, consumedOffset)) 28 consumerTopicStats.getConsumerTopicStats(topic).messageRate.mark() 29 consumerTopicStats.getConsumerAllTopicStats().messageRate.mark() 30 item 31 } 32 // 或许有,尝试计算一下下一个 33 def maybeComputeNext(): Boolean = { 34 state = FAILED 35 nextItem = makeNext() 36 if(state == DONE) { 37 false 38 } else { 39 state = READY 40 true 41 } 42 } 43 // 建立下一个元素,这个在子类ConsumerIterator中有实现 44 protected def makeNext(): MessageAndMetadata[K, V] = { 45 // 首先channel 是 LinkedBlockingQueue实例, 是 KafkaStream 中的 queue 成员变量,queue 成员变量 46 var currentDataChunk: FetchedDataChunk = null 47 // if we don't have an iterator, get one 48 var localCurrent = current.get() 49 // 若是没有迭代器或者是没有下一个元素了,须要从channel中取一个 50 if(localCurrent == null || !localCurrent.hasNext) { 51 // 删除并返回队列的头节点。 52 if (consumerTimeoutMs < 0) 53 currentDataChunk = channel.take // 阻塞方法,一直等待,直到有可用元素 54 else { 55 currentDataChunk = channel.poll(consumerTimeoutMs, TimeUnit.MILLISECONDS) // 阻塞方法,等待指定时间,超时也会返回 56 if (currentDataChunk == null) { // 若是没有数据,重置状态为NOT_READY 57 // reset state to make the iterator re-iterable 58 resetState() 59 throw new ConsumerTimeoutException 60 } 61 } 62 // 关闭命令 63 if(currentDataChunk eq ZookeeperConsumerConnector.shutdownCommand) { 64 debug("Received the shutdown command") 65 return allDone // 该函数将状态设为DONE, 返回null 66 } else { 67 currentTopicInfo = currentDataChunk.topicInfo 68 val cdcFetchOffset = currentDataChunk.fetchOffset 69 val ctiConsumeOffset = currentTopicInfo.getConsumeOffset 70 if (ctiConsumeOffset < cdcFetchOffset) { 71 error("consumed offset: %d doesn't match fetch offset: %d for %s;\n Consumer may lose data" 72 .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo)) 73 currentTopicInfo.resetConsumeOffset(cdcFetchOffset) 74 } 75 localCurrent = currentDataChunk.messages.iterator 76 77 current.set(localCurrent) 78 } 79 // if we just updated the current chunk and it is empty that means the fetch size is too small! 80 if(currentDataChunk.messages.validBytes == 0) 81 throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " + 82 "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow." 83 .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset)) 84 } 85 var item = localCurrent.next() 86 // reject the messages that have already been consumed 87 while (item.offset < currentTopicInfo.getConsumeOffset && localCurrent.hasNext) { 88 item = localCurrent.next() 89 } 90 consumedOffset = item.nextOffset 91 92 item.message.ensureValid() // validate checksum of message to ensure it is valid 93 // 返回处理封装好的 kafka 数据 94 new MessageAndMetadata(currentTopicInfo.topic, currentTopicInfo.partitionId, item.message, item.offset, keyDecoder, valueDecoder) 95 }
咱们再来看,org.apache.spark.streaming.kafka.ReliableKafkaReceiver#onStart 的第10 步相应的代码:
1 // 10. 将待处理的MessageHandler 放入 线程池中,等待执行 2 topicMessageStreams.values.foreach { streams => 3 streams.foreach { stream => 4 messageHandlerThreadPool.submit(new MessageHandler(stream)) 5 } 6 }
其中 MessageHandler 是一个 Runnable 对象,其 run 方法以下:
1 override def run(): Unit = { 2 while (!isStopped) { 3 try { 4 // 1. 获取ConsumerIterator 迭代器对象 5 val streamIterator = stream.iterator() 6 // 2. 遍历迭代器中获取每一条数据,而且保存message和相应的 metadata 信息 7 while (streamIterator.hasNext) { 8 storeMessageAndMetadata(streamIterator.next) 9 } 10 } catch { 11 case e: Exception => 12 reportError("Error handling message", e) 13 } 14 } 15 }
其中第二步中关键方法,org.apache.spark.streaming.kafka.ReliableKafkaReceiver#storeMessageAndMetadata 方法以下:
1 /** Store a Kafka message and the associated metadata as a tuple. */ 2 private def storeMessageAndMetadata( 3 msgAndMetadata: MessageAndMetadata[K, V]): Unit = { 4 val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.partition) 5 val data = (msgAndMetadata.key, msgAndMetadata.message) 6 val metadata = (topicAndPartition, msgAndMetadata.offset) 7 // 添加数据到 block 8 blockGenerator.addDataWithCallback(data, metadata) 9 }
addDataWithCallback 源码以下:
1 /** 2 * Push a single data item into the buffer. After buffering the data, the 3 * `BlockGeneratorListener.onAddData` callback will be called. 4 */ 5 def addDataWithCallback(data: Any, metadata: Any): Unit = { 6 if (state == Active) { 7 waitToPush() 8 synchronized { 9 if (state == Active) { 10 // 1. 将数据放入 buffer 中,以便处理线程从中获取数据 11 currentBuffer += data 12 // 2. 在启动 receiver线程中,能够知道listener 是指GeneratedBlockHandler 实例 13 listener.onAddData(data, metadata) 14 } else { 15 throw new SparkException( 16 "Cannot add data as BlockGenerator has not been started or has been stopped") 17 } 18 } 19 } else { 20 throw new SparkException( 21 "Cannot add data as BlockGenerator has not been started or has been stopped") 22 } 23 }
第二步比较简单,先看一下第二步:
org.apache.spark.streaming.kafka.ReliableKafkaReceiver.GeneratedBlockHandler#onAddData的源码以下:
1 def onAddData(data: Any, metadata: Any): Unit = { 2 // Update the offset of the data that was added to the generator 3 if (metadata != null) { 4 val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)] 5 updateOffset(topicAndPartition, offset) 6 } 7 } 8 // 这里的 updateOffset 调用的是//org.apache.spark.streaming.kafka.ReliableKafkaReceiver#updateOffset,源码以下: 9 /** Update stored offset */ 10 private def updateOffset(topicAndPartition: TopicAndPartition, offset: Long): Unit = { 11 topicPartitionOffsetMap.put(topicAndPartition, offset) 12 }
第一步的原理以下:
在 BlockGenerator中有一个定时器,定时(200ms)去执行检查currentBuffer是否为empty任务, 若不为空,则执行以下操做并把它放入等待生成block 的队列中,有两外一个线程来时刻监听这个队列,有数据,则执行pushBlock 操做。
第一个定时器线程以下:
1 private val blockIntervalTimer = 2 new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator") 3 4 // 其中,updateCurrentBuffer 方法以下 5 /** Change the buffer to which single records are added to. */ 6 private def updateCurrentBuffer(time: Long): Unit = { 7 try { 8 var newBlock: Block = null 9 synchronized { 10 if (currentBuffer.nonEmpty) { 11 val newBlockBuffer = currentBuffer 12 currentBuffer = new ArrayBuffer[Any] 13 val blockId = StreamBlockId(receiverId, time - blockIntervalMs) 14 listener.onGenerateBlock(blockId) 15 newBlock = new Block(blockId, newBlockBuffer) 16 } 17 } 18 19 if (newBlock != null) { 20 blocksForPushing.put(newBlock) // put is blocking when queue is full 21 } 22 } catch { 23 case ie: InterruptedException => 24 logInfo("Block updating timer thread was interrupted") 25 case e: Exception => 26 reportError("Error in block updating thread", e) 27 } 28 } 29 30 // listener.onGenerateBlock(blockId) 代码以下: 31 def onGenerateBlock(blockId: StreamBlockId): Unit = { 32 // Remember the offsets of topics/partitions when a block has been generated 33 rememberBlockOffsets(blockId) 34 } 35 // rememberBlockOffsets 代码以下: 36 /** 37 * Remember the current offsets for each topic and partition. This is called when a block is 38 * generated. 39 */ 40 private def rememberBlockOffsets(blockId: StreamBlockId): Unit = { 41 // Get a snapshot of current offset map and store with related block id. 42 val offsetSnapshot = topicPartitionOffsetMap.toMap 43 blockOffsetMap.put(blockId, offsetSnapshot) 44 topicPartitionOffsetMap.clear() 45 } 46 // 能够看出,主要是清除 topic-partition-> offset 映射关系 47 // 创建 block 和topic-partition-> offset的映射关系
其中,blocksForPushing是一个有界阻塞队列,另一个线程会一直轮询它。
1 private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize) 2 private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } 3 4 /** Keep pushing blocks to the BlockManager. */ 5 // 这个方法主要的做用就是一直不停地轮询blocksForPushing队列,并处理相应的push block 事件。 6 private def keepPushingBlocks() { 7 logInfo("Started block pushing thread") 8 9 def areBlocksBeingGenerated: Boolean = synchronized { 10 state != StoppedGeneratingBlocks 11 } 12 13 try { 14 // While blocks are being generated, keep polling for to-be-pushed blocks and push them. 15 while (areBlocksBeingGenerated) { // 线程没有被中止,则一直循环 16 // 超时poll操做获取并删除头节点,超过期间(10ms)则返回 17 Option(blocksForPushing.poll(10, TimeUnit.MILLISECONDS)) match { 18 case Some(block) => pushBlock(block) // 若是有数据则进行处理。 19 case None => 20 } 21 } 22 23 // At this point, state is StoppedGeneratingBlock. So drain the queue of to-be-pushed blocks. 24 logInfo("Pushing out the last " + blocksForPushing.size() + " blocks") 25 while (!blocksForPushing.isEmpty) { // 若是队列中还有数据,继续进行处理 26 val block = blocksForPushing.take() // 这是一个堵塞方法,不过如今会立刻返回,由于队列里面有数据。 27 logDebug(s"Pushing block $block") 28 pushBlock(block) // 处理数据 29 logInfo("Blocks left to push " + blocksForPushing.size()) 30 } 31 logInfo("Stopped block pushing thread") 32 } catch { 33 case ie: InterruptedException => 34 logInfo("Block pushing thread was interrupted") 35 case e: Exception => 36 reportError("Error in block pushing thread", e) 37 } 38 }
其中的pushBlock源码以下:
1 private def pushBlock(block: Block) { 2 listener.onPushBlock(block.id, block.buffer) 3 logInfo("Pushed block " + block.id) 4 }
其调用的listener(org.apache.spark.streaming.kafka.ReliableKafkaReceiver.GeneratedBlockHandler)的 onPushBlock 源码以下:
1 def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { 2 // Store block and commit the blocks offset 3 storeBlockAndCommitOffset(blockId, arrayBuffer) 4 }
其中,storeBlockAndCommitOffset具体代码以下:
1 /** 2 * Store the ready-to-be-stored block and commit the related offsets to zookeeper. This method 3 * will try a fixed number of times to push the block. If the push fails, the receiver is stopped. 4 */ 5 private def storeBlockAndCommitOffset( 6 blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = { 7 var count = 0 8 var pushed = false 9 var exception: Exception = null 10 while (!pushed && count <= 3) { // 整个过程,总共容许3 次重试 11 try { 12 store(arrayBuffer.asInstanceOf[mutable.ArrayBuffer[(K, V)]]) 13 pushed = true 14 } catch { 15 case ex: Exception => 16 count += 1 17 exception = ex 18 } 19 } 20 if (pushed) { // 已经push block 21 // 更新 offset 22 Option(blockOffsetMap.get(blockId)).foreach(commitOffset) 23 // 若是已经push 到 BlockManager 中,则不会再保留 block和topic-partition-> offset的映射关系 24 blockOffsetMap.remove(blockId) 25 } else { 26 stop("Error while storing block into Spark", exception) 27 } 28 } 29 // 其中,commitOffset源码以下: 30 /** 31 * Commit the offset of Kafka's topic/partition, the commit mechanism follow Kafka 0.8.x's 32 * metadata schema in Zookeeper. 33 */ 34 private def commitOffset(offsetMap: Map[TopicAndPartition, Long]): Unit = { 35 if (zkClient == null) { 36 val thrown = new IllegalStateException("Zookeeper client is unexpectedly null") 37 stop("Zookeeper client is not initialized before commit offsets to ZK", thrown) 38 return 39 } 40 41 for ((topicAndPart, offset) <- offsetMap) { 42 try { 43 // 获取在 zk 中 comsumer 的partition的目录 44 val topicDirs = new ZKGroupTopicDirs(groupId, topicAndPart.topic) 45 val zkPath = s"${topicDirs.consumerOffsetDir}/${topicAndPart.partition}" 46 // 更新 consumer 的已消费topic-partition 的offset 操做 47 ZkUtils.updatePersistentPath(zkClient, zkPath, offset.toString) 48 } catch { 49 case e: Exception => 50 logWarning(s"Exception during commit offset $offset for topic" + 51 s"${topicAndPart.topic}, partition ${topicAndPart.partition}", e) 52 } 53 54 logInfo(s"Committed offset $offset for topic ${topicAndPart.topic}, " + 55 s"partition ${topicAndPart.partition}") 56 } 57 }
关键方法store 以下:
1 /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ 2 def store(dataBuffer: ArrayBuffer[T]) { 3 supervisor.pushArrayBuffer(dataBuffer, None, None) 4 }
其调用了supervisor(org.apache.spark.streaming.receiver.ReceiverSupervisorImpl实例)的pushArrayBuffer方法,内部操做以下:
1 /** Store an ArrayBuffer of received data as a data block into Spark's memory. */ 2 def pushArrayBuffer( 3 arrayBuffer: ArrayBuffer[_], 4 metadataOption: Option[Any], 5 blockIdOption: Option[StreamBlockId] 6 ) { 7 pushAndReportBlock(ArrayBufferBlock(arrayBuffer), metadataOption, blockIdOption) 8 }
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl#pushAndReportBlock 源码以下:
1 /** Store block and report it to driver */ 2 def pushAndReportBlock( 3 receivedBlock: ReceivedBlock, 4 metadataOption: Option[Any], 5 blockIdOption: Option[StreamBlockId] 6 ) { 7 // 1.准备blockId,time等信息 8 val blockId = blockIdOption.getOrElse(nextBlockId) 9 val time = System.currentTimeMillis 10 // 2. 执行存储 block 操做 11 val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock) 12 logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms") 13 // 3. 获取保存的message 的记录数 14 val numRecords = blockStoreResult.numRecords 15 // 4. 通知trackerEndpoint已经添加block,执行更新driver 的WAL操做 16 val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) 17 trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) 18 logDebug(s"Reported block $blockId") 19 }
其中,receivedBlockHandler 的赋值语句以下:
1 private val receivedBlockHandler: ReceivedBlockHandler = { 2 if (WriteAheadLogUtils.enableReceiverLog(env.conf)) { 3 if (checkpointDirOption.isEmpty) { 4 throw new SparkException( 5 "Cannot enable receiver write-ahead log without checkpoint directory set. " + 6 "Please use streamingContext.checkpoint() to set the checkpoint directory. " + 7 "See documentation for more details.") 8 } 9 // enable WAL而且checkpoint dir 不为空,即,在这里,返回WriteAheadLogBasedBlockHandler 对象,这个对象持有了 blockmanager,streamid,storagelevel,conf,checkpointdir 等信息 10 new WriteAheadLogBasedBlockHandler(env.blockManager, receiver.streamId, 11 receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get) 12 } else { 13 new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel) 14 } 15 }
ReceivedBlockHandler 的 storeBlock方法源码以下:
1 /** 2 * This implementation stores the block into the block manager as well as a write ahead log. 3 * It does this in parallel, using Scala Futures, and returns only after the block has 4 * been stored in both places. 5 */ 6 // 并行地将block 存入 blockmanager 和 write ahead log,使用scala 的Future 机制实现的,当两个都写完毕以后,返回。 7 def storeBlock(blockId: StreamBlockId, block: ReceivedBlock): ReceivedBlockStoreResult = { 8 9 var numRecords = None: Option[Long] 10 // Serialize the block so that it can be inserted into both 11 // 1. 将ReceivedBlock序列化(未使用压缩机制)成字节数组 12 val serializedBlock = block match { // serializedBlock 就是序列化后的结果 13 case ArrayBufferBlock(arrayBuffer) => // go this branch 14 numRecords = Some(arrayBuffer.size.toLong) 15 blockManager.dataSerialize(blockId, arrayBuffer.iterator) 16 case IteratorBlock(iterator) => 17 val countIterator = new CountingIterator(iterator) 18 val serializedBlock = blockManager.dataSerialize(blockId, countIterator) 19 numRecords = countIterator.count 20 serializedBlock 21 case ByteBufferBlock(byteBuffer) => 22 byteBuffer 23 case _ => 24 throw new Exception(s"Could not push $blockId to block manager, unexpected block type") 25 } 26 27 // 2. Store the block in block manager 28 val storeInBlockManagerFuture = Future { 29 val putResult = 30 blockManager.putBytes(blockId, serializedBlock, effectiveStorageLevel, tellMaster = true) 31 if (!putResult.map { _._1 }.contains(blockId)) { 32 throw new SparkException( 33 s"Could not store $blockId to block manager with storage level $storageLevel") 34 } 35 } 36 37 // 3. Store the block in write ahead log 38 val storeInWriteAheadLogFuture = Future { 39 writeAheadLog.write(serializedBlock, clock.getTimeMillis()) 40 } 41 42 // 4. Combine the futures, wait for both to complete, and return the write ahead log record handle 43 val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2) 44 // 等待future任务结果返回。默认时间是 30s, 使用spark.streaming.receiver.blockStoreTimeout 参数来变动默认值 45 val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout) 46 // 返回cache以后的block 相关信息 47 WriteAheadLogBasedStoreResult(blockId, numRecords, walRecordHandle) 48 }
注意WriteAheadLogBasedStoreResult 这个 WriteAheadLogBasedStoreResult 实例,后面 RDD 在处理的时候会使用到。
org.apache.spark.streaming.receiver.ReceiverSupervisorImpl#pushAndReportBlock 通知driver addBlock 的源码以下:
1 // 4. 通知trackerEndpoint已经添加block,执行更新driver 的WAL操做 2 val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) 3 trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo)) 4 logDebug(s"Reported block $blockId")
跳过中间的RPC操做,直接到 driver 端org.apache.spark.streaming.scheduler.ReceiverTracker.ReceiverTrackerEndpoint#receiveAndReply 中:
1 case AddBlock(receivedBlockInfo) => 2 if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) { 3 walBatchingThreadPool.execute(new Runnable { 4 override def run(): Unit = Utils.tryLogNonFatalError { 5 if (active) { 6 context.reply(addBlock(receivedBlockInfo)) 7 } else { 8 throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.") 9 } 10 } 11 }) 12 } else { 13 context.reply(addBlock(receivedBlockInfo)) 14 }
其中 addBlock方法源码以下:
1 /** Add new blocks for the given stream */ 2 private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 3 receivedBlockTracker.addBlock(receivedBlockInfo) 4 }
其中,org.apache.spark.streaming.scheduler.ReceivedBlockTracker#addBlock 源码以下:
1 /** Add received block. This event will get written to the write ahead log (if enabled). */ 2 def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = { 3 try { 4 val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo)) 5 if (writeResult) { 6 synchronized { 7 getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo 8 } 9 logDebug(s"Stream ${receivedBlockInfo.streamId} received " + 10 s"block ${receivedBlockInfo.blockStoreResult.blockId}") 11 } else { 12 logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " + 13 s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.") 14 } 15 writeResult 16 } catch { 17 case NonFatal(e) => 18 logError(s"Error adding block $receivedBlockInfo", e) 19 false 20 } 21 } 22 /** Write an update to the tracker to the write ahead log */ 23 private def writeToLog(record: ReceivedBlockTrackerLogEvent): Boolean = { 24 if (isWriteAheadLogEnabled) { 25 logTrace(s"Writing record: $record") 26 try { 27 writeAheadLogOption.get.write(ByteBuffer.wrap(Utils.serialize(record)), 28 clock.getTimeMillis()) 29 true 30 } catch { 31 case NonFatal(e) => 32 logWarning(s"Exception thrown while writing record: $record to the WriteAheadLog.", e) 33 false 34 } 35 } else { 36 true 37 } 38 } 39 /** Get the queue of received blocks belonging to a particular stream */ 40 private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = { 41 streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue) 42 }
上述代码,主要是将BlockAdditionEvent写WAL和更新队列(其实就是mutable.HashMap[Int, ReceivedBlockQueue]),这个队列中存放的是streamId ->UnallocatedBlock 的映射关系
createStream 源码以下:
1 /** 2 * Create an input stream that pulls messages from Kafka Brokers. 3 * @param ssc StreamingContext object 4 * @param kafkaParams Map of kafka configuration parameters, 5 * see http://kafka.apache.org/08/configuration.html 6 * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed 7 * in its own thread. 8 * @param storageLevel Storage level to use for storing the received objects 9 * @tparam K type of Kafka message key 10 * @tparam V type of Kafka message value 11 * @tparam U type of Kafka message key decoder 12 * @tparam T type of Kafka message value decoder 13 * @return DStream of (Kafka message key, Kafka message value) 14 */ 15 def createStream[K: ClassTag, V: ClassTag, U <: Decoder[_]: ClassTag, T <: Decoder[_]: ClassTag]( 16 ssc: StreamingContext, 17 kafkaParams: Map[String, String], 18 topics: Map[String, Int], 19 storageLevel: StorageLevel 20 ): ReceiverInputDStream[(K, V)] = { 21 // 能够经过设置spark.streaming.receiver.writeAheadLog.enable参数为 true来开启WAL 22 val walEnabled = WriteAheadLogUtils.enableReceiverLog(ssc.conf) 23 new KafkaInputDStream[K, V, U, T](ssc, kafkaParams, topics, walEnabled, storageLevel) 24 }
建立的是KafkaInputDStream对象:
1 /** 2 * Input stream that pulls messages from a Kafka Broker. 3 * 4 * @param kafkaParams Map of kafka configuration parameters. 5 * See: http://kafka.apache.org/configuration.html 6 * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed 7 * in its own thread. 8 * @param storageLevel RDD storage level. 9 */ 10 private[streaming] 11 class KafkaInputDStream[ 12 K: ClassTag, 13 V: ClassTag, 14 U <: Decoder[_]: ClassTag, 15 T <: Decoder[_]: ClassTag]( 16 ssc_ : StreamingContext, 17 kafkaParams: Map[String, String], 18 topics: Map[String, Int], 19 useReliableReceiver: Boolean, 20 storageLevel: StorageLevel 21 ) extends ReceiverInputDStream[(K, V)](ssc_) with Logging { 22 23 def getReceiver(): Receiver[(K, V)] = { 24 if (!useReliableReceiver) { // 未启用 WAL,会使用 KafkaReceiver 对象 25 new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) 26 } else { // 若是启用了WAL, 使用ReliableKafkaReceiver 27 new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) 28 } 29 } 30 }
org.apache.spark.streaming.kafka.KafkaInputDStream 继承父类的 compute方法:
1 /** 2 * Generates RDDs with blocks received by the receiver of this stream. */ 3 override def compute(validTime: Time): Option[RDD[T]] = { 4 val blockRDD = { 5 6 if (validTime < graph.startTime) { 7 // If this is called for any time before the start time of the context, 8 // then this returns an empty RDD. This may happen when recovering from a 9 // driver failure without any write ahead log to recover pre-failure data. 10 new BlockRDD[T](ssc.sc, Array.empty) 11 } else { 12 // Otherwise, ask the tracker for all the blocks that have been allocated to this stream 13 // for this batch 14 val receiverTracker = ssc.scheduler.receiverTracker 15 val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) 16 17 // Register the input blocks information into InputInfoTracker 18 val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum) 19 ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) 20 21 // Create the BlockRDD 22 createBlockRDD(validTime, blockInfos) 23 } 24 } 25 Some(blockRDD) 26 }
getBlocksOfBatch 以下:
1 /** Get the blocks for the given batch and all input streams. */ 2 def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = { 3 receivedBlockTracker.getBlocksOfBatch(batchTime) 4 } 5 调用: 6 /** Get the blocks allocated to the given batch. */ 7 def getBlocksOfBatch(batchTime: Time): Map[Int, Seq[ReceivedBlockInfo]] = synchronized { 8 timeToAllocatedBlocks.get(batchTime).map { _.streamIdToAllocatedBlocks }.getOrElse(Map.empty) 9 }
在 org.apache.spark.streaming.scheduler.JobGenerator 中声明了一个定时器:
1 // timer 会按照批次间隔 生成 GenerateJobs 任务,并放入eventLoop 堵塞队列中 2 private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds, 3 longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
EventLoop 实例化代码以下:
1 eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") { 2 override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event) 3 4 override protected def onError(e: Throwable): Unit = { 5 jobScheduler.reportError("Error in job generator", e) 6 } 7 } 8 eventLoop.start()
EventLoop里定义了一个LinkedBlockingDeque双端堵塞队列和一个执行daemon线程,daemon线程会不停从 双端堵塞队列中堵塞式取数据,一旦取到数据,会调 onReceive 方法,即 processEvent 方法:
1 /** Processes all events */ 2 private def processEvent(event: JobGeneratorEvent) { 3 logDebug("Got event " + event) 4 event match { 5 case GenerateJobs(time) => generateJobs(time) 6 case ClearMetadata(time) => clearMetadata(time) 7 case DoCheckpoint(time, clearCheckpointDataLater) => 8 doCheckpoint(time, clearCheckpointDataLater) 9 case ClearCheckpointData(time) => clearCheckpointData(time) 10 } 11 }
因为是GenerateJobs 事件, 会继续调用generateJobs 方法:
1 /** Generate jobs and perform checkpoint for the given `time`. */ 2 private def generateJobs(time: Time) { 3 // Set the SparkEnv in this thread, so that job generation code can access the environment 4 // Example: BlockRDDs are created in this thread, and it needs to access BlockManager 5 // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed. 6 SparkEnv.set(ssc.env) 7 Try { 8 // 1. 将 WAL block 信息 分配给batch(这些数据块信息是worker 节点cache 到WAL 以后发送给driver 端的) 9 jobScheduler.receiverTracker.allocateBlocksToBatch(time) 10 // 2. 使用分配的block数据块来生成任务 11 graph.generateJobs(time) // generate jobs using allocated block 12 } match { 13 case Success(jobs) => 14 val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) 15 jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) 16 case Failure(e) => 17 jobScheduler.reportError("Error generating jobs for time " + time, e) 18 } 19 // 发布DoCheckpoint 事件,保存checkpoint操做,主要是将新的checkpoint 数据写入到 hdfs, 删除旧的 checkpoint 数据 20 eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) 21 }
第一步中调用的
org.apache.spark.streaming.scheduler.ReceiverTracker#allocateBlocksToBatch方法以下:
1 /** Allocate all unallocated blocks to the given batch. */ 2 def allocateBlocksToBatch(batchTime: Time): Unit = { 3 if (receiverInputStreams.nonEmpty) { 4 receivedBlockTracker.allocateBlocksToBatch(batchTime) 5 } 6 }
其中,org.apache.spark.streaming.scheduler.ReceivedBlockTracker#allocateBlocksToBatch 方法以下:
1 def allocateBlocksToBatch(batchTime: Time): Unit = synchronized { 2 if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) { 3 // 遍历输入流,根据流的 streamId 获取未被分配的block队列,并返回[streamId, seq[receivedBlockInfo]],由此可知,到此为止,数据其实已经从receiver中读出来了。 4 // 获取 streamid和 WAL的blocks 的映射关系 5 val streamIdToBlocks = streamIds.map { streamId => 6 (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true)) 7 }.toMap 8 val allocatedBlocks = AllocatedBlocks(streamIdToBlocks) 9 if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) { 10 timeToAllocatedBlocks.put(batchTime, allocatedBlocks) 11 lastAllocatedBatchTime = batchTime 12 } else { 13 logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery") 14 } 15 } else { 16 // This situation occurs when: 17 // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent, 18 // possibly processed batch job or half-processed batch job need to be processed again, 19 // so the batchTime will be equal to lastAllocatedBatchTime. 20 // 2. Slow checkpointing makes recovered batch time older than WAL recovered 21 // lastAllocatedBatchTime. 22 // This situation will only occurs in recovery time. 23 logInfo(s"Possibly processed batch $batchTime need to be processed again in WAL recovery") 24 } 25 }
其中,getReceivedBlockQueue的源码以下:
1 /** Get the queue of received blocks belonging to a particular stream */ 2 private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = { 3 streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue) 4 }
能够看到,worker node 发送过来的block 数据被取出来了。
org.apache.spark.streaming.dstream.ReceiverInputDStream#createBlockRDD 源码以下:
1 private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = { 2 3 if (blockInfos.nonEmpty) { 4 val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray 5 // 全部的block已经有了WriteAheadLogRecordHandle, 建立一个WALBackedBlockRDD便可, 不然建立BlockRDD。 6 // 其中,WriteAheadLogRecordHandle 是一个跟WAL 相关联的EntryInfo,实现类FileBasedWriteAheadLogSegment就包含了WAL segment 的path, offset 以及 length 信息。RDD 在真正须要数据时,根据这些handle信息从 WAL 中读取数据。 7 // Are WAL record handles present with all the blocks 8 val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty } 9 10 if (areWALRecordHandlesPresent) { 11 // If all the blocks have WAL record handle, then create a WALBackedBlockRDD 12 val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray 13 val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray 14 new WriteAheadLogBackedBlockRDD[T]( 15 ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid) 16 } else { 17 // Else, create a BlockRDD. However, if there are some blocks with WAL info but not 18 // others then that is unexpected and log a warning accordingly. 19 if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) { 20 if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { 21 logError("Some blocks do not have Write Ahead Log information; " + 22 "this is unexpected and data may not be recoverable after driver failures") 23 } else { 24 logWarning("Some blocks have Write Ahead Log information; this is unexpected") 25 } 26 } 27 val validBlockIds = blockIds.filter { id => 28 ssc.sparkContext.env.blockManager.master.contains(id) 29 } 30 if (validBlockIds.size != blockIds.size) { 31 logWarning("Some blocks could not be recovered as they were not found in memory. " + 32 "To prevent such data loss, enabled Write Ahead Log (see programming guide " + 33 "for more details.") 34 } 35 new BlockRDD[T](ssc.sc, validBlockIds) 36 } 37 } else { 38 // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD 39 // according to the configuration 40 if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) { 41 new WriteAheadLogBackedBlockRDD[T]( 42 ssc.sparkContext, Array.empty, Array.empty, Array.empty) 43 } else { 44 new BlockRDD[T](ssc.sc, Array.empty) 45 } 46 } 47 }
org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD#compute 的源码以下:
1 /** 2 * Gets the partition data by getting the corresponding block from the block manager. 3 * If the block does not exist, then the data is read from the corresponding record 4 * in write ahead log files. 5 */ 6 override def compute(split: Partition, context: TaskContext): Iterator[T] = { 7 assertValid() 8 val hadoopConf = broadcastedHadoopConf.value 9 val blockManager = SparkEnv.get.blockManager 10 val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition] 11 val blockId = partition.blockId 12 13 def getBlockFromBlockManager(): Option[Iterator[T]] = { 14 blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]]) 15 } 16 17 def getBlockFromWriteAheadLog(): Iterator[T] = { 18 var dataRead: ByteBuffer = null 19 var writeAheadLog: WriteAheadLog = null 20 try { 21 // The WriteAheadLogUtils.createLog*** method needs a directory to create a 22 // WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for 23 // writing log data. However, the directory is not needed if data needs to be read, hence 24 // a dummy path is provided to satisfy the method parameter requirements. 25 // FileBasedWriteAheadLog will not create any file or directory at that path. 26 // FileBasedWriteAheadLog will not create any file or directory at that path. Also, 27 // this dummy directory should not already exist otherwise the WAL will try to recover 28 // past events from the directory and throw errors. 29 val nonExistentDirectory = new File( 30 System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath 31 writeAheadLog = WriteAheadLogUtils.createLogForReceiver( 32 SparkEnv.get.conf, nonExistentDirectory, hadoopConf) 33 dataRead = writeAheadLog.read(partition.walRecordHandle) 34 } catch { 35 case NonFatal(e) => 36 throw new SparkException( 37 s"Could not read data from write ahead log record ${partition.walRecordHandle}", e) 38 } finally { 39 if (writeAheadLog != null) { 40 writeAheadLog.close() 41 writeAheadLog = null 42 } 43 } 44 if (dataRead == null) { 45 throw new SparkException( 46 s"Could not read data from write ahead log record ${partition.walRecordHandle}, " + 47 s"read returned null") 48 } 49 logInfo(s"Read partition data of $this from write ahead log, record handle " + 50 partition.walRecordHandle) 51 if (storeInBlockManager) { 52 blockManager.putBytes(blockId, dataRead, storageLevel) 53 logDebug(s"Stored partition data of $this into block manager with level $storageLevel") 54 dataRead.rewind() 55 } 56 blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]] 57 } 58 // 若是partition.isBlockIdValid 为true,则说明该 block 数据存在executors 中 59 if (partition.isBlockIdValid) { 60 // 先根据 BlockManager从 executor中读取数据, 若是没有,再从WAL 中读取数据 61 // BlockManager 从内存仍是从磁盘上获取的数据 ? 62 blockManager 从 local 或 remote 获取 block,其中 local既能够从 memory 中获取也能够从 磁盘中读取, 其中remote获取数据是同步的,即在fetch block 过程当中会一直blocking。 63 getBlockFromBlockManager().getOrElse { getBlockFromWriteAheadLog() } 64 } else { 65 getBlockFromWriteAheadLog() 66 } 67 }
至此,从启动 receiver,到receiver 接收数据并保存到WAL block,driver 接收WAL 的block 信息,直到spark streaming 经过WAL RDD 来获取数据等等都一一作了说明。
原文出处:https://www.cnblogs.com/johnny666888/p/11100334.html