kafka0.8版本和sparkstreaming整合的两种不一样方式

最近研究了不一样kafka版本和sparkstreaming整合时的区别,整理以下php

1- kafka-0.8.2以上kafka-0.10如下

http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html#approach-2-direct-approach-no-receivershtml

一种是基于receiver的,一种是没有receiver的,两种不一样的方式有不一样的编程模型、性能特征和语义保证。python

1-1 基于receiver的方式

这种方式使用的是kafka的high-level api,经过kafka接收到的消息会被存储在spark的executor中,而后启动一个spark streaming job来处理数据。源码分析以下.git

1-1-1 重写Receiver的onStart方法

Initialize the block generator for storing Kafka message.github

1-1-1-1 构造BlockGenerator时,会构造一个定时器:

private val blockIntervalTimer = new RecurringTimer(clock, blockIntervalMs, 
updateCurrentBuffer, "BlockGenerator")

这个定时器线程会定时得把接收到的消息构形成一个block,大概流程以下:
1.构造线程web

private val thread = new Thread("RecurringTimer - " + name)

2.调用loop,启动循环apache

try {
      while (!stopped) {
        triggerActionForNextInterval()
      }

3.调用triggerActionForNextInterval,这个方法里会调用一个高阶函数callback:updateCurrentBuffer编程

4.callback: 把一个buffer转换成block,用新的空buffer接收数据bootstrap

try {
      var newBlock: Block = null
      synchronized {
        if (currentBuffer.nonEmpty) { //若是buffer没满,可是定时时间已到,则构造一个新的buffer
                                      //出来用于接收下一批数据,而旧的block则
          val newBlockBuffer = currentBuffer
          currentBuffer = new ArrayBuffer[Any]
          val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
          listener.onGenerateBlock(blockId)
          newBlock = new Block(blockId, newBlockBuffer)
        }
      }

      if (newBlock != null) {
        blocksForPushing.put(newBlock)  // put is blocking when queue is full
      }
    blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)

1-1-1-2 另一个在blockGenerator中的线程是

private val blockPushingThread = new Thread() { 
    override def run() { 
        keepPushingBlocks() 
}

要做用就是调用keepPushingBlocks方法,把block push到BlockManager中api

1-1-1-3 线程池

线程池大小就是全部topic线程数之和

messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool(
   topics.values.sum, "KafkaMessageHandler")

val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
  .newInstance(consumerConfig.props)
  .asInstanceOf[Decoder[V]]

1-1-1-4 构造kafka消息流,返回Map[String,List[KafkaStream[K,V]]]

val topicMessageStreams = consumerConnector.createMessageStreams(topics, keyDecoder, valueDecoder)

方法原型是:

def createMessageStreams[K,V](topicCountMap: Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
 : Map[String,List[KafkaStream[K,V]]]

该方法详细分析参见:createMessageStreams方法分析

1-1-1-5 提交线程池启动消息消费

一个list[stream]表示一个topic的多个kafka消息流,由于是多线程消费,因此这里是一个list

topicMessageStreams.values.foreach { streams =>//遍历list
  streams.foreach { stream =>//遍历stream
    messageHandlerThreadPool.submit(new MessageHandler(stream))//为一个消费者的kafka stream启动一个线程来接收消息,即消费

来看看这个MessageHandler线程作些什么事情。
每接收到一个消息,调用一次storeMessageAndMetadata方法:

private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable {
  override def run(): Unit = {
    while (!isStopped) {
      try {
        val streamIterator = stream.iterator()
        while (streamIterator.hasNext) {
          storeMessageAndMetadata(streamIterator.next)
        }
      } catch {
        case e: Exception =>
          reportError("Error handling message", e)
      }
    }
  }
}

storeMessageAndMetadata方法:

private def storeMessageAndMetadata( msgAndMetadata: MessageAndMetadata[K, V]): Unit = {
  val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.
                                              partition)
  val data = (msgAndMetadata.key, msgAndMetadata.message)
  val metadata = (topicAndPartition, msgAndMetadata.offset)
  blockGenerator.addDataWithCallback(data, metadata)
}

addDataWithCallback方法:

def addDataWithCallback(data: Any, metadata: Any): Unit = {
  if (state == Active) {
    waitToPush()
    synchronized {
      if (state == Active) {
        currentBuffer += data
        listener.onAddData(data, metadata)
      } else {
        throw new SparkException(
          "Cannot add data as BlockGenerator has not been started or has been stopped")
      }
    }
  } else {
    throw new SparkException(
      "Cannot add data as BlockGenerator has not been started or has been stopped")
  }
}

把data,即一条消息存入currentBuffer这个ArrayBuffer数组中,里的onAddData方法,调用的是
ReliableKafkaReceiver中的GeneratedBlockHandler的onAddData方法:

def onAddData(data: Any, metadata: Any): Unit = {
  // Update the offset of the data that was added to the generator
  if (metadata != null) {
    val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
    updateOffset(topicAndPartition, offset)
  }
}

上边的参数data和metadata分别是:

val data = (msgAndMetadata.key, msgAndMetadata.message)
val metadata = (topicAndPartition, msgAndMetadata.offset)

而updateOffset方法的做用是把partition和offset存入topicPartitionOffsetMap这个map中,更新每一个
partition的最新offset。

总体的调用关系以下:

keepPushingBlocks:
     ->pushBlock           ->GeneratedBlockHandler.onPushBlock                ->storeBlockAndCommitOffset                     ->store                          ->ReceiverSupervisorImpl.pushArrayBuffer                               ->pushAndReportBlock                                    ->WriteAheadLogBasedBlockHandler.storeBlock(存储到blockmanager和wal都成功才算成功)                                         ->                                    ->RpcEndpointRef.askWithRetry                     ->commitOffset(更新每一个topic每一个partition的offset) //疑问,为何这里须要commit一次offset呢?不是在一个batch处理结束时须要手动commit一次吗?                       def consumerOffsetDir = consumerGroupDir + "/offsets/" + topic
                      def consumerOwnerDir = consumerGroupDir + "/owners/" + topic

这种方式在默认配置下有丢失数据分风险,须要配置wal,这种方式把数据还会保存一份到hdfs上的wal内。代码这样写:

import org.apache.spark.streaming.kafka._

 val kafkaStream = KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

主要是per-topic number of Kafka partitions to consume很差理解,看看spark工程中workcount的例子:

val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")

//能够看出,这里传入的topic能够是多个,例如:topic1,topic2 16,表示有两个消费两个topic的消息,每一个topic启动16个线程接收数据
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
//这里的topicMap是:("topic1"->16, "topic2"->16)
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

特别说明:
- kafka的topic的partition和rdd的partitin并不对应,因此增长topic的partition不会增长数据处
理并行度
- 能够用不一样的消费者组和topic建立多个不一样的kafka input DStreams,这样可使用多个接收器并行的接收数据
- 若是开启了wal,并把wal保存在HDFS上,接收到的数据就会在log中复制一次。所以,这样的input stream的存储level就是StorageLevel.MEMORY_AND_DISK_SER

1-2 direct方式(无接收器)

这种方式在spark1.3中被引入,用来保证端到端的一致性。这种方式周期性的使用每一个topic+partition
的最新的offset去kafka中获取数据,相应的也就定义了每一个batch要处理的offert范围。每一个处理数据的
job启动以后,sparkstreaming使用kafka的simple consumer api直接读取定义好的offset范围内的消息,
就像从文件系统读数据同样(seek to offset and read)。所谓kafka的simple api,有如下三个主要特色:
- Read a message multiple times(重复读取)
- Consume only a subset of the partitions in a topic in a process(跳读)
- Manage transactions to make sure a message is processed once and only once(Exactly Once原语)

可是这种api也是很是复杂的:
- You must keep track of the offsets in your application to know where you left off consuming.(Offset本身管理)
- You must figure out which Broker is the lead Broker for a topic and partition
- You must handle Broker leader changes

这种directstream的方式有如下的优势:
- 并行度获得简化。不须要在建立多个kafka stream而后合并他们。有了directstream,sparkstreaming
能够建立和kafka的partition同样多的rdd的partition,这样能够并行的读取kafka的数据。rdd的partition和kafka的partition是一一对应的,这样容易理解也容易调优。
- 更高效。第一种方式要求数据写入wal,而后再复制一次数据,因此一共两次复制:一次是从kafka,
第二次是wal。direct方式没有这种问题。
- exactly-once语义。第一种方式使用kafka的high level api来把offset存储到zk,这是常见的消费
kafka数据的方式。可是这种方式只能作到数据零丢失,即at-least once语义,即有些数据可能被消费两次。这种状况会发生,是由于sparkstreaming跟踪的offset和zk存储的offset的不一致致使的。所以,在direct方式中,使用simple kafka api(不依赖zk),sparkstreaming跟踪的offset存储在checkpoint中。这就消除了上述的offset的不一致性。为了达到这种exactly-once语义,把结果存储到外部存储设备的output操做必须是幂等的,或者是原子的方式保存结果和offsets(查看 Semantics of output operations 能够获取更多信息)
- 缺点。这种方式惟一的缺点就是不在zk中更新offset,所以那些基于zk的kafka监控工具就秀不出消费进度了。所以,能够用下边的方法手动的在每一个batch后更新offset到zk

direct方式编程方法:

import org.apache.spark.streaming.kafka._

 val directKafkaStream = KafkaUtils.createDirectStream[
     [key class], [value class], [key decoder class], [value decoder class] ](
     streamingContext, [map of Kafka parameters], [set of topics to consume])

也开始传递一个messageHandler给createDirectStream方法,这样就能够访问MessageAndMetadata对象,这里边包含了当前消息的元数据,进而能够把消息转换成任何想要的类型。See the API docs and the example.

对于kafka parameters,要么要指定metadata.broker.list,要么要指定bootstrap.servers。默认状况下,从每一个kafka partition的最新offset开始消费。若是在配置中指定了auto.offset.reset为smallest,就会从最小的offet开始消费,有码为证:

方法说明:

* Points to note:
   *  - No receivers: This stream does not use any receiver. It directly queries Kafka
   *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
   *    by the stream itself. For interoperability with Kafka monitoring tools that depend on
   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
   *    You can access the offsets used in each batch from the generated RDDs (see
   *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
   *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
   *    in the `StreamingContext`. The information on consumed offset can be
   *    recovered from the checkpoint. See the programming guide for details (constraints, etc.).
   *  - End-to-end semantics: This stream ensures that every records is effectively received and
   *    transformed exactly once, but gives no guarantees on whether the transformed data are
   *    outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
   *    that the output operation is idempotent, or use transactions to output records atomically.
   *    See the programming guide for more details.

createDirectStream方法有不少重载方法,下边这个方法是最终的入口方法。

def createDirectStream[
    K: ClassTag,
    V: ClassTag,
    KD <: Decoder[K]: ClassTag,
    VD <: Decoder[V]: ClassTag] (
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Set[String]
  ): InputDStream[(K, V)] = {
    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
    val kc = new KafkaCluster(kafkaParams)
    val fromOffsets = **getFromOffsets**(kc, kafkaParams, topics) //若是没有指定在调用的时候指定fromOffsets,那么会调用getFromOffsets方法,计算offset
    new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
      ssc, kafkaParams, fromOffsets, messageHandler)
  }
private[kafka] def getFromOffsets( kc: KafkaCluster, kafkaParams: Map[String, String], topics: Set[String] ): Map[TopicAndPartition, Long] = {
    val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
    val result = for {
      topicPartitions <- kc.getPartitions(topics).right
      leaderOffsets <- (if (reset == Some("smallest")) {//若是auto.offset.reset为smallest,就从这个topic-partition的最小offset开始消费
        kc.getEarliestLeaderOffsets(topicPartitions)
      } else {//不然,从最新的offsets开始消费
        kc.getLatestLeaderOffsets(topicPartitions)
      }).right
    } yield {
      leaderOffsets.map { case (tp, lo) =>
          (tp, lo.offset)
      }
    }
    KafkaCluster.checkErrors(result)
  }

也能够从任意的offset开始消费,这就要使用KafkaUtils.createDirectStream的变种。若是想要访问每一个batch处理的offset,能够这样作:

// Hold a reference to the current offset ranges, so it can be used downstream
 var offsetRanges = Array.empty[OffsetRange]

 directKafkaStream.transform { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }.map {
           ...
 }.foreachRDD { rdd =>
   for (o <- offsetRanges) {
     println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
   }
   ...
 }

来看DirectKafkaInputDStream的实现,最主要就是compute方法,由于直接就能够经过读取kafka的数据来构造rdd了。

override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
        val untilOffsets = clamp(latestLeaderOffsets(maxRetries))//根据配置的最大速率对sparkstreaming进行限流,其实就是计算结束offset
    val rdd = KafkaRDD[K, V, U, T, R](
      context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)

    // Report the record number and metadata of this batch interval to InputInfoTracker.
    val offsetRanges = currentOffsets.map { case (tp, fo) =>
      val uo = untilOffsets(tp)
      OffsetRange(tp.topic, tp.partition, fo, uo.offset)
    }
    val description = offsetRanges.filter { offsetRange =>
      // Don't display empty ranges.
      offsetRange.fromOffset != offsetRange.untilOffset
    }.map { offsetRange =>
      s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
    }.mkString("\n")
    // Copy offsetRanges to immutable.List to prevent from being modified by the user
    val metadata = Map(
      "offsets" -> offsetRanges.toList,
      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

    currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
    Some(rdd)
  }

一样,也能够手动更行zk上的offset。

Note that the typecast to HasOffsetRanges will only succeed if it is done in the first method called on the directKafkaStream, not later down a chain of methods. You can use transform() instead of foreachRDD() as your first method call in order to access offsets, then call further Spark methods. However, be aware that the one-to-one mapping between RDD partition and Kafka partition does not remain after any methods that shuffle or repartition, e.g. reduceByKey() or window().

Another thing to note is that since this approach does not use Receivers, the standard receiver-related (that is, configurations of the form spark.streaming.receiver.* ) will not apply to the input DStreams created by this approach (will apply to other input DStreams though). Instead, use the configurations spark.streaming.kafka.*. An important one is spark.streaming.kafka.maxRatePerPartition which is the maximum rate (in messages per second) at which each Kafka partition will be read by this direct API.

如今着重分析一下上边提到的getLeaderOffsets方法,不论是从最新的(latest)或者是从最先的(earliest)offset开始消费,都要调用这个方法,只不过传入的参数不一样:

def getLatestLeaderOffsets(
      topicAndPartitions: Set[TopicAndPartition]
    ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
    getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime)//最新时间点

  def getEarliestLeaderOffsets(
      topicAndPartitions: Set[TopicAndPartition]
    ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
    getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime)//最先时间点

  def getLeaderOffsets(
      topicAndPartitions: Set[TopicAndPartition],
      before: Long
    ): Either[Err, Map[TopicAndPartition, LeaderOffset]] = { getLeaderOffsets(topicAndPartitions, before, 1).right.map { r => r.map { kv => // mapValues isn't serializable, see SI-7005 kv._1 -> kv._2.head }
    }
  }

能够看到getLeaderOffsets调用了另一个getLeaderOffsets方法,以下:

def getLeaderOffsets(
      topicAndPartitions: Set[TopicAndPartition],
      before: Long,
      maxNumOffsets: Int
    ): Either[Err, Map[TopicAndPartition, Seq[LeaderOffset]]] = {
    findLeaders(topicAndPartitions).right.flatMap { tpToLeader =>//Find an active Broker and find out which Broker is the leader for your topic and partition
      val leaderToTp: Map[(String, Int), Seq[TopicAndPartition]] = flip(tpToLeader)
      val leaders = leaderToTp.keys
      var result = Map[TopicAndPartition, Seq[LeaderOffset]]()
      val errs = new Err
      withBrokers(leaders, errs) { consumer =>
        val partitionsToGetOffsets: Seq[TopicAndPartition] =
          leaderToTp((consumer.host, consumer.port))
        val reqMap = partitionsToGetOffsets.map { tp: TopicAndPartition =>
          tp -> PartitionOffsetRequestInfo(before, maxNumOffsets)
        }.toMap
        val req = OffsetRequest(reqMap)
        val resp = consumer.getOffsetsBefore(req)
        val respMap = resp.partitionErrorAndOffsets
        partitionsToGetOffsets.foreach { tp: TopicAndPartition =>
          respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
            if (por.error == ErrorMapping.NoError) {
              if (por.offsets.nonEmpty) {
                result += tp -> por.offsets.map { off =>
                  LeaderOffset(consumer.host, consumer.port, off)
                }
              } else {
                errs += new SparkException(
                  s"Empty offsets for ${tp}, is ${before} before log beginning?")
              }
            } else {
              errs += ErrorMapping.exceptionFor(por.error)
            }
          }
        }
        if (result.keys.size == topicAndPartitions.size) {
          return Right(result)
        }
      }
      val missing = topicAndPartitions.diff(result.keySet)
      errs += new SparkException(s"Couldn't find leader offsets for ${missing}")
      Left(errs)
    }
  }

来看看compute方法中的clamp方法,clamp的中文意思是钳制、限制:

// limits the maximum number of messages per partition
  protected def clamp(
    leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {
    val offsets = leaderOffsets.mapValues(lo => lo.offset)

    maxMessagesPerPartition(offsets).map { mmp => //调用maxMessagesPerPartition方法计算每一个partition消费的最大消息条数
      mmp.map { case (tp, messages) =>
        val lo = leaderOffsets(tp)
        tp -> lo.copy(offset = Math.min(currentOffsets(tp) + messages, lo.offset))
      }
    }.getOrElse(leaderOffsets)
  }

maxMessagesPerPartition方法:

protected[streaming] def maxMessagesPerPartition(
      offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = {
    val estimatedRateLimit = rateController.map(_.getLatestRate())

    // calculate a per-partition rate limit based on current lag
    val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match {
      case Some(rate) =>
        val lagPerPartition = offsets.map { case (tp, offset) =>
          tp -> Math.max(offset - currentOffsets(tp), 0)
        }
        val totalLag = lagPerPartition.values.sum

        lagPerPartition.map { case (tp, lag) =>
          val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
          tp -> (if (maxRateLimitPerPartition > 0) {
            Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate)
        }
      case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition }
    }

    if (effectiveRateLimitPerPartition.values.sum > 0) {
      val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000//batch大小成一limit,就是这个partition能消费的最大消息数,而后再加上起始offset,就是结束offset
      Some(effectiveRateLimitPerPartition.map {
        case (tp, limit) => tp -> (secsPerBatch * limit).toLong
      })
    } else {
      None
    }
  }

计算出每一个partition能消费消息的起始offset和结束offset后,就能够构造KafkaRDD了,这是一个伴生对象和伴生类,先看伴生对象的apply方法:

def apply[
    K: ClassTag,
    V: ClassTag,
    U <: Decoder[_]: ClassTag,
    T <: Decoder[_]: ClassTag,
    R: ClassTag](
      sc: SparkContext,
      kafkaParams: Map[String, String],
      fromOffsets: Map[TopicAndPartition, Long],
      untilOffsets: Map[TopicAndPartition, LeaderOffset],
      messageHandler: MessageAndMetadata[K, V] => R
    ): KafkaRDD[K, V, U, T, R] = { val leaders = untilOffsets.map { case (tp, lo) => tp -> (lo.host, lo.port) }.toMap

    //使用fromoffset,即起始offset和untiloffset,即结束offset,构造offsetrange
    val offsetRanges = fromOffsets.map { case (tp, fo) => val uo = untilOffsets(tp) OffsetRange(tp.topic, tp.partition, fo, uo.offset) }.toArray

    //使用offsetrange构造KafkaRDD
    //注意这里要区分RDD的compute方法和DStream的compute方法,下面着重分析KafkaRDD的compute方法
    new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaders, messageHandler)
  }

一个典型的RDD子类须要继承如下几个方法:

  • getPartitions
  • getPreferredLocations
  • compute KafkaRDD的compute方法主要返回一个KafkaRDDIterator对象,这个迭代器对象调用getNext方法,就会返回一条从kafka集群读到的消息