设置并行度参数spark.streaming.concurrentJobs >1 时候,使用spark streaming消费kafkagit
There may be two or more tasks in one executor will use the same kafka consumer at the same time, then it will throw an exception: "KafkaConsumer is not safe for multi-threaded access"
https://issues.apache.org/jira/browse/SPARK-22606?jql=text ~ "spark.streaming.concurrentJobs"github
PR地址:https://github.com/apache/spark/pull/19819web
spark streaming消费kafka时候,默认开启了对kafkaconsumer进行缓存,经过存放到HashMap中实现,所以就须要有相应的key,才能找到具体到kafkaconsumer。apache
//原生的代码中是没有threadId变量的,经过加入线程id ,使得不一样的线程不能同时使用同一个kafkaconsumer private case class CacheKey(groupId: String, topic: String, partition: Int, threadId: Long) private var cache: ju.LinkedHashMap[CacheKey, CachedKafkaConsumer[_, _]] = null CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, threadId, kafkaParams)
这个办法其实就是为缓存在map中的CachedKafkaConsumer对应的key增长了一个参数是线程id,使得不让多个线程使用同一个consumer,可是这种状况每个task都须要去建立一个consumer,是消耗资源的。
PR中这样一句评论:
It will create a new consumer for each thread. This could be quite resource consuming when several topics shared with thread pools.
缓存
对spark-streaming-kafka中的CacheKafkaConsumer进行了重构,首先介绍几个类ide
//接口 KafkaDataConsumer //KafkaDataConsumer的实现类 private case class CachedKafkaDataConsumer[K, V](internalConsumer: InternalKafkaConsumer[K, V]) extends KafkaDataConsumer[K, V] { assert(internalConsumer.inUse) override def release(): Unit = KafkaDataConsumer.release(internalConsumer) } private case class NonCachedKafkaDataConsumer[K, V](internalConsumer: InternalKafkaConsumer[K, V]) extends KafkaDataConsumer[K, V] { override def release(): Unit = internalConsumer.close() } //那么InternalKafkaConsumer是什么?其实对KafkaConsumer进行了封装而已,持有KafkaConsumer对象 private[kafka010] class InternalKafkaConsumer[K, V]( val topicPartition: TopicPartition, val kafkaParams: ju.Map[String, Object]) private[kafka010] case class CacheKey(groupId: String, topicPartition: TopicPartition) private[kafka010] var cache: ju.Map[CacheKey, InternalKafkaConsumer[_, _]] = null
那么为了防止一个executor中多个task同时使用同一个KafkaConsumer,如何解决呢?经过看如何获取的consumer便可看到解决方案!svg
def acquire[K, V]( topicPartition: TopicPartition, kafkaParams: ju.Map[String, Object], context: TaskContext, useCache: Boolean): KafkaDataConsumer[K, V] = synchronized { val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] //根据groupId以及topicPartition建立相应的key val key = new CacheKey(groupId, topicPartition) //根据key得到缓存的InternalKafkaConsumer对象,其实能够理解为KafkaConsumer对象,就是多了一层封装 val existingInternalConsumer = cache.get(key) lazy val newInternalConsumer = new InternalKafkaConsumer[K, V](topicPartition, kafkaParams) //若是TaskContext不为null,同时task尝试次数大于等于1 if (context != null && context.attemptNumber >= 1) { logDebug(s"Reattempt detected, invalidating cached consumer $existingInternalConsumer") //若是缓存中存在该key的InternalKafkaConsumer对象 if (existingInternalConsumer != null) { // 若是缓存中存在而且是使用状态,设置markedForClose=true,意思是说下一次release时候会将其关闭 //若是缓存了而且非使用状态,那么直接关闭,并从缓存移除 if (existingInternalConsumer.inUse) { existingInternalConsumer.markedForClose = true } else { existingInternalConsumer.close() cache.remove(key) } } logDebug("Reattempt detected, new non-cached consumer will be allocated " + s"$newInternalConsumer") //这个最外层if分支建立新的consumer , 最后返回 NonCachedKafkaDataConsumer NonCachedKafkaDataConsumer(newInternalConsumer) } else if (!useCache) { //若是task重试次数小于1 或者 taskcontext不存在,而且没有使用缓存,直接建立NonCachedKafkaDataConsumer对象 logDebug("Cache usage turned off, new non-cached consumer will be allocated " + s"$newInternalConsumer") NonCachedKafkaDataConsumer(newInternalConsumer) } else if (existingInternalConsumer == null) { //使用缓存了,可是缓存中不存在,直接建立CachedKafkaDataConsumer logDebug("No cached consumer, new cached consumer will be allocated " + s"$newInternalConsumer") cache.put(key, newInternalConsumer) CachedKafkaDataConsumer(newInternalConsumer) } else if (existingInternalConsumer.inUse) { // 缓存中存在而且当前是在使用,那么建立一个新的InternalConsmer而后封装到NonCachedKafkaDataConsumer中返回 logDebug("Used cached consumer found, new non-cached consumer will be allocated " + s"$newInternalConsumer") NonCachedKafkaDataConsumer(newInternalConsumer) } else { //缓存中存在而且没有被使用,直接设置为使用状态,而后封装到CachedKafkaDataConsumer中返回 logDebug(s"Not used cached consumer found, re-using it $existingInternalConsumer") existingInternalConsumer.inUse = true // Any given TopicPartition should have a consistent key and value type CachedKafkaDataConsumer(existingInternalConsumer.asInstanceOf[InternalKafkaConsumer[K, V]]) } }
将InternalConsumer的markedForClose字段设置为true,意味着这个对象的kafkaconsumer对象要关闭ui
//KafkaRDD中增长了一个task完成监听器,若是任务完成调用closeIfNeeded方法 context.addTaskCompletionListener[Unit](_ => closeIfNeeded()) def closeIfNeeded(): Unit = { if (consumer != null) { consumer.release() } } //上面的consumer是KafkaDataConsumer的子类的对象,其两个子类以下: // 1: internalConsumer会缓存 private case class CachedKafkaDataConsumer[K, V](internalConsumer: InternalKafkaConsumer[K, V]) extends KafkaDataConsumer[K, V] { assert(internalConsumer.inUse) //直接调用父类的release方法 override def release(): Unit = KafkaDataConsumer.release(internalConsumer) } //看其父类的release方法 private def release(internalConsumer: InternalKafkaConsumer[_, _]): Unit = synchronized { //获取internalConsumer的groupid topicpartition而后组成key,根据key去缓存查找相应的internalConsumer对象 val key = new CacheKey(internalConsumer.groupId, internalConsumer.topicPartition) val cachedInternalConsumer = cache.get(key) //若是要释放的internalConsumer是缓存中存放的 if (internalConsumer.eq(cachedInternalConsumer)) { // 标记为ture那么调用其close方法,而后从缓存移除 if (internalConsumer.markedForClose) { internalConsumer.close() cache.remove(key) } else { //若是没有标记为true,意味着继续在缓存,不会移除,只是将其使用状态改成false internalConsumer.inUse = false } } else { // 这个对象没有被缓存过,或者 不等于缓存中的,直接关闭 internalConsumer.close() logInfo(s"Released a supposedly cached consumer that was not found in the cache " + s"$internalConsumer") } } } // 2 : internalConsumer不会缓存 private case class NonCachedKafkaDataConsumer[K, V](internalConsumer: InternalKafkaConsumer[K, V]) extends KafkaDataConsumer[K, V] { //直接调用其持有对象internalConsumer的close方法 override def release(): Unit = internalConsumer.close() } //internalConsumer的close方法其实就是调用KafkaConsumer的close方法 def close(): Unit = consumer.close() //此处consumer是什么? private val consumer = createConsumer private def createConsumer: KafkaConsumer[K, V] = { val updatedKafkaParams = KafkaConfigUpdater("executor", kafkaParams.asScala.toMap) .setAuthenticationConfigIfNeeded() .build() val c = new KafkaConsumer[K, V](updatedKafkaParams) val topics = ju.Arrays.asList(topicPartition) c.assign(topics) c }
经过这个方案,当咱们没有使用缓存时候直接建立NonCachedKafkaDataConsumer对象,NonCachedKafkaDataConsumer对象封装了InternalConsumer, InternalConsumer对象中持有KafkaConsumer对象,InternalConsumer不会被缓存放到Map中。spa
当使用缓存时候,首先从根据groupid topicpartiiton组成的key,获得缓存的InternalConsumer对象,不存在就是null。线程
若是缓存不存在那么直接建立CachedKafkaDataConsumer对象,而后将这个对象引用的InternalConsumer对象缓存到Map中;
若是缓存已经存在了而且InternalConsumer当前是使用状态,那么直接建立NonCachedKafkaDataConsumer对象,这个对象持有的InternalConsumer对象是新建的,并非缓存中的,虽然参数(topicpartition对象和kafkaParams)与缓存中的InternalConsumer是同样的;
若是缓存存在InternalConsumer而且不是使用状态,直接把缓存中的InternalConsumer设置为使用状态,而后封装到CachedKafkaDataConsumer中。
若是任务有重试,以前缓存的InternalConsumer若是是非使用状态,直接关闭而且缓存中移除;若是缓存的InternalConsumer是使用状态,将其标记为下一次release时候移除的状态,最后任务重试也须要相应的consumer,所以会返回一个NonCachedKafkaDataConsumer对象,而且里面的InternalConsumer对象是新建的,并无使用缓存中的