整合Kafka到Spark Streaming——代码示例和挑战

做者Michael G. Noll是瑞士的一位工程师和研究员,效力于Verisign,是Verisign实验室的大规模数据分析基础设施(基础Hadoop)的技术主管。本文,Michael详细的演示了如何将Kafka整合到Spark Streaming中。 期间, Michael还提到了将Kafka整合到 Spark Streaming中的一些现状,很是值得阅读,虽然有一些信息在Spark 1.2版本中已发生了一些变化,好比HA策略: 经过Spark Contributor、Spark布道者陈超咱们了解到 ,在Spark 1.2版本中,Spark Streaming开始支持fully HA模式(选择使用),经过添加一层WAL(Write Ahead Log),每次收到数据后都会存在HDFS上,从而避免了之前版本中的数据丢失状况,可是不可避免的形成了必定的开销,须要开发者自行衡量。php

如下为译文html

做为一个实时大数据处理工具, Spark Sreaming 近日一直被普遍关注,与 Apache Storm 的对比也常常出现。可是依我说,缺乏与Kafka整合,任何实时大数据处理工具都是不完整的,所以我将一个示例Spark Streaming应用程序添加到 kafka-storm-starter ,而且示范如何从Kafka读取,以及如何写入到Kafka。在这个过程当中,我还使用Avro做为数据格式,以及Twitter Bijection进行数据序列化。java

在本篇文章,我将详细地讲解这个Spark Streaming示例;同时,我还会穿插当下Spark Streaming与Kafka整合的一些焦点话题。免责声明:这是我首次试验Spark Streaming,仅做为参考。node

当下,这个Spark Streaming示例被上传到GitHub,下载访问: kafka-storm-starter。项目的名称或许会让你产生某些误解,不过,不要在乎这些细节:)git

什么是Spark Streaming

Spark Streaming 是Apache Spark的一个子项目。Spark是个相似于Apache Hadoop的开源批处理平台,而Spark Streaming则是个实时处理工具,运行在Spark引擎之上。github

Spark Streaming vs. Apache Storm

Spark Streaming与Apache Storm有一些类似之处,后者是当下最流行的大数据处理平台。前不久,雅虎的Bobby Evans 和Tom Graves曾发表过一个“ Spark and Storm at Yahoo! ”的演讲,在这个演讲中,他们对比了两个大平台,并提供了一些选择参考。相似的,Hortonworks的P. Taylor Goetz也分享过名为 Apache Storm and Spark Streaming Compared 的讲义。web

这里,我也提供了一个很是简短的对比:对比Spark Streaming,Storm的产业采用更高,生产环境应用也更稳定。可是从另外一方面来讲,对比Storm,Spark拥有更清晰、等级更高的API,所以Spark使用起来也更加愉快,最起码是在使用Scala编写Spark应用程序的状况(毫无疑问,我更喜欢Spark中的API)。可是,请别这么直接的相信个人话,多看看上面的演讲和讲义。算法

无论是Spark仍是Storm,它们都是Apache的顶级项目,当下许多大数据平台提供商也已经开始整合这两个框架(或者其中一个)到其商业产品中,好比Hortonworks就同时整合了Spark和Storm,而Cloudera也整合了Spark。shell

附录:Spark中的Machines、cores、executors、tasks和receivers 

本文的后续部分将讲述许多Spark和Kafka中的parallelism问题,所以,你须要掌握一些Spark中的术语以弄懂这些环节。数据库

  • 一个Spark集群必然包含了1个以上的工者做节点,又称为从主机(为了简化架构,这里咱们先抛弃开集群管理者不谈)。

  • 一个工做者节点能够运行一个以上的executor

  • Executor是一个用于应用程序或者工做者节点的进程,它们负责处理tasks,并将数据保存到内存或者磁盘中。每一个应用程序都有属于本身的executors,一个executor则包含了必定数量的cores(也被称为slots)来运行分配给它的任务。

  • Task是一个工做单元,它将被传送给executor。也就是说,task将是你应用程序的计算内容(或者是一部分)。SparkContext将把这些tasks发送到executors进行执行。每一个task都会占用父executor中的一个core(slot)。

  • Receiver( API  文档 )将做为一个长期运行的task跑在一个executor上。每一个receiver都会负责一个所谓的input DStream(好比从Kafka中读取的一个输入流),同时每一个receiver( input DStream)占用一个core/slot。

  • input DStream:input DStream是DStream的一个类型,它负责将Spark Streaming链接到外部的数据源,用于读取数据。对于每一个外部数据源(好比Kafka)你都须要配置一个input DStream。一个Spark Streaming会经过一个input DStream与一个外部数据源进行链接,任何后续的DStream都会创建标准的DStreams。

在Spark的执行模型,每一个应用程序都会得到本身的executors,它们会支撑应用程序的整个流程,并以多线程的方式运行1个以上的tasks,这种隔离途径很是相似Storm的执行模型。一旦引入相似YARN或者Mesos这样的集群管理器,整个架构将会变得异常复杂,所以这里将不会引入。你能够经过Spark文档中的 Cluster Overview 了解更多细节。

整合Kafka到Spark Streaming

概述

简而言之,Spark是支持Kafka的,可是这里存在许多不完善的地方。

Spark代码库中的 KafkaWordCount 对于咱们来讲是个很是好的起点,可是这里仍然存在一些开放式问题。

特别是我想了解如何去作:

  • 从kafaka中并行读入。在Kafka,一个话题(topic)能够有N个分区。理想的状况下,咱们但愿在多个分区上并行读取。这也是 Kafka spout in Storm 的工做。

  • 从一个Spark Streaming应用程序向Kafka写入,一样,咱们须要并行执行。

在完成这些操做时,我一样碰到了Spark Streaming和/或Kafka中一些已知的问题,这些问题大部分都已经在Spark mailing list中列出。在下面,我将详细总结Kafka集成到Spark的现状以及一些常见问题。

Kafka中的话题、分区(partitions)和parallelism

详情能够查看我以前的博文: Apache Kafka 0.8 Training Deck and Tutorial Running a Multi-Broker Apache Kafka 0.8 Cluster on a Single Node 

Kafka将数据存储在话题中,每一个话题都包含了一些可配置数量的分区。话题的分区数量对于性能来讲很是重要,而这个值通常是消费者parallelism的最大数量:若是一个话题拥有N个分区,那么你的应用程序最大程度上只能进行N个线程的并行,最起码在使用Kafka内置Scala/Java消费者API时是这样的。

与其说应用程序,不如说Kafka术语中的消费者群(consumer group)。消费者群,经过你选择的字符串识别,它是逻辑消费者应用程序集群范围的识别符。同一个消费者群中的全部消费者将分担从一个指定Kafka话题中的读取任务,同时,同一个消费组中全部消费者从话题中读取的线程数最大值便是N(等同于分区的数量),多余的线程将会闲置。

多个不一样的Kafka消费者群能够并行的运行:毫无疑问,对同一个Kafka话题,你能够运行多个独立的逻辑消费者应用程序。这里,每一个逻辑应用程序都会运行本身的消费者线程,使用一个惟一的消费者群id。而每一个应用程序一般可使用不一样的read parallelisms(见下文)。当在下文我描述不一样的方式配置read parallelisms时,我指的是如何完成这些逻辑消费者应用程序中的一个设置。

这里有一些简单的例子

  • 你的应用程序使用“terran”消费者群id对一个名为“zerg.hydra”的kafka话题进行读取,这个话题拥有10个分区。若是你的消费者应用程序只配置一个线程对这个话题进行读取,那么这个线程将从10个分区中进行读取。

  • 同上,可是此次你会配置5个线程,那么每一个线程都会从2个分区中进行读取。

  • 同上,此次你会配置10个线程,那么每一个线程都会负责1个分区的读取。

  • 同上,可是此次你会配置多达14个线程。那么这14个线程中的10个将平分10个分区的读取工做,剩下的4个将会被闲置。

这里咱们不妨看一下现实应用中的复杂性——Kafka中的再平衡事件。在Kafka中,再平衡是个生命周期事件(lifecycle event),在消费者加入或者离开消费者群时都会触发再平衡事件。这里咱们不会进行详述,更多再平衡详情可参见个人 Kafka training deck 一文。

你的应用程序使用消费者群id“terran”,而且从1个线程开始,这个线程将从10个分区中进行读取。在运行时,你逐渐将线程从1个提高到14个。也就是说,在同一个消费者群中,parallelism忽然发生了变化。毫无疑问,这将形成Kafka中的再平衡。一旦在平衡结束,你的14个线程中将有10个线程平分10个分区的读取工做,剩余的4个将会被闲置。所以如你想象的同样,初始线程之后只会读取一个分区中的内容,将不会再读取其余分区中的数据。

如今,咱们终于对话题、分区有了必定的理解,而分区的数量将做为从Kafka读取时parallelism的上限。可是对于一个应用程序来讲,这种机制会产生一个什么样的影响,好比一个Spark Streaming job或者 Storm topology从Kafka中读取数据做为输入。

1. Read parallelism: 一般状况下,你指望使用N个线程并行读取Kafka话题中的N个分区。同时,鉴于数据的体积,你指望这些线程跨不一样的NIC,也就是跨不一样的主机。在Storm中,这能够经过TopologyBuilder#setSpout()设置Kafka spout的parallelism为N来实现。在Spark中,你则须要作更多的事情,在下文我将详述如何实现这一点。

2. Downstream processing parallelism: 一旦使用Kafka,你但愿对数据进行并行处理。鉴于你的用例,这种等级的parallelism必然与read parallelism有所区别。若是你的用例是计算密集型的,举个例子,对比读取线程,你指望拥有更多的处理线程;这能够经过从多个读取线程shuffling或者网路“fanning out”数据处处理线程实现。所以,你经过增加网络通讯、序列化开销等将访问交付给更多的cores。在Storm中,你经过shuffle grouping 将Kafka spout shuffling到下游的bolt中。在Spark中,你须要经过DStreams上的 repartition 转换来实现。

一般状况下,你们都渴望去耦从Kafka的parallelisms读取,并当即处理读取来的数据。在下一节,我将详述使用 Spark Streaming从Kafka中的读取和写入。

从Kafka中读取

Spark Streaming中的Read parallelism

相似Kafka,Read parallelism中也有分区的概念。了解Kafka的per-topic话题与RDDs in Spark 中的分区没有关联很是重要。

Spark Streaming中的 KafkaInputDStream (又称为Kafka链接器)使用了Kafka的高等级消费者API ,这意味着在Spark中为Kafka设置 read parallelism将拥有两个控制按钮。

1. Input DStreams的数量。 由于Spark在每一个Input DStreams都会运行一个receiver(=task),这就意味着使用多个input DStreams将跨多个节点并行进行读取操做,所以,这里寄但愿于多主机和NICs。

2. Input DStreams上的消费者线程数量。 这里,相同的receiver(=task)将运行多个读取线程。这也就是说,读取操做在每一个core/machine/NIC上将并行的进行。

在实际状况中,第一个选择显然更是你们指望的。

为何会这样?首先以及最重要的,从Kafka中读取一般状况下会受到网络/NIC限制,也就是说,在同一个主机上你运行多个线程不会增长读的吞吐量。另外一方面来说,虽然不常常,可是有时候从Kafka中读取也会遭遇CPU瓶颈。其次,若是你选择第二个选项,多个读取线程在将数据推送到blocks时会出现锁竞争(在block生产者实例上,BlockGenerator的“+=”方法真正使用的是“synchronized”方式)。

input DStreams创建的RDDs分区数量:KafkaInputDStream将储存从Kafka中读取的每一个信息到Blocks。从个人理解上,一个新的Block由 spark.streaming.blockInterval在毫秒级别创建,而每一个block都会转换成RDD的一个分区,最终由DStream创建。若是个人这种假设成立,那么由KafkaInputDStream创建的RDDs分区数量由batchInterval / spark.streaming.blockInterval决定,而batchInterval则是数据流拆分红batches的时间间隔,它能够经过StreamingContext的一个构造函数参数设置。举个例子,若是你的批时间价格是2秒(默认状况下),而block的时间间隔是200毫秒(默认状况),那么你的RDD将包含10个分区。若是有错误的话,能够提醒我。

选项1:控制input DStreams的数量

下面这个例子能够从 Spark Streaming Programming Guide 中得到:

val ssc: StreamingContext = ??? // ignore for nowval kafkaParams: Map[String, String] = Map("group.id" -> "terran", /* ignore rest */)val numInputDStreams = 5val kafkaDStreams = (1 to numInputDStreams).map { _ => KafkaUtils.createStream(...) }

在这个例子中,咱们创建了5个input DStreams,所以从Kafka中读取的工做将分担到5个核心上,寄但愿于5个主机/NICs(之因此说是寄但愿于,由于我也不肯定Spark Streaming task布局策略是否会将receivers投放到多个主机上)。全部Input Streams都是“terran”消费者群的一部分,而Kafka将保证topic的全部数据能够同时对这5个input DSreams可用。换句话说,这种“collaborating”input DStreams设置能够工做是基于消费者群的行为是由Kafka API提供,经过KafkaInputDStream完成。

在这个例子中,我没有提到每一个input DSream会创建多少个线程。在这里,线程的数量能够经过KafkaUtils.createStream方法的参数设置(同时,input topic的数量也能够经过这个方法的参数指定)。在下一节中,咱们将经过实际操做展现。

可是在开始以前,在这个步骤我先解释几个Spark Streaming中常见的几个问题,其中有些由于当下Spark中存在的一些限制引发,另外一方面则是因为当下Kafka input DSreams的一些设置形成:

当你使用我上文介绍的多输入流途径,而这些消费者都是属于同一个消费者群,它们会给消费者指定负责的分区。这样一来则可能致使syncpartitionrebalance的失败,系统中真正工做的消费者可能只会有几个。为了解决这个问题,你能够把再均衡尝试设置的很是高,从而得到它的帮助。而后,你将会碰到另外一个坑——若是你的receiver宕机(OOM,亦或是硬件故障),你将中止从Kafka接收消息。

Spark用户讨论 markmail.org/message/…

这里,咱们须要对“中止从Kafka中接收”问题 作一些解释 。当下,当你经过ssc.start()开启你的streams应用程序后,处理会开始并一直进行,即便是输入数据源(好比Kafka)变得不可用。也就是说,流不能检测出是否与上游数据源失去连接,所以也不会对丢失作出任何反应,举个例子来讲也就是重连或者结束执行。相似的,若是你丢失这个数据源的一个receiver,那么 你的流应用程序可能就会生成一些空的RDDs 

这是一个很是糟糕的状况。最简单也是最粗糙的方法就是,在与上游数据源断开链接或者一个receiver失败时,重启你的流应用程序。可是,这种解决方案可能并不会产生实际效果,即便你的应用程序须要将Kafka配置选项auto.offset.reset设置到最小——由于Spark Streaming中一些已知的bug,可能致使你的流应用程序发生一些你意想不到的问题,在下文Spark Streaming中常见问题一节咱们将详细的进行介绍。

选择2:控制每一个input DStream上小发着线程的数量

在这个例子中,咱们将创建一个单一的input DStream,它将运行3个消费者线程——在同一个receiver/task,所以是在同一个core/machine/NIC上对Kafka topic “zerg.hydra”进行读取。

val ssc: StreamingContext = ??? // ignore for nowval kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)val consumerThreadsPerInputDstream = 3val topics = Map("zerg.hydra" -> consumerThreadsPerInputDstream)val stream = KafkaUtils.createStream(ssc, kafkaParams, topics, ...)

KafkaUtils.createStream方法被重载,所以这里有一些不一样方法的特征。在这里,咱们会选择Scala派生以得到最佳的控制。

结合选项1和选项2

下面是一个更完整的示例,结合了上述两种技术:

val ssc: StreamingContext = ???val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)val numDStreams = 5val topics = Map("zerg.hydra" -> 1)val kafkaDStreams = (1 to numDStreams).map { _ =>
    KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
  }

咱们创建了5个input DStreams,它们每一个都会运行一个消费者线程。若是“zerg.hydra”topic拥有5个分区(或者更少),那么这将是进行并行读取的最佳途径,若是你在乎系统最大吞吐量的话。

Spark Streaming中的并行Downstream处理

在以前的章节中,咱们覆盖了从Kafka的并行化读取,那么咱们就能够在Spark中进行并行化处理。那么这里,你必须弄清楚Spark自己是如何进行并行化处理的。相似Kafka,Spark将parallelism设置的与(RDD)分区数量有关, 经过在每一个RDD分区上运行task进行 。在有些文档中,分区仍然被称为“slices”。

在任何Spark应用程序中,一旦某个Spark Streaming应用程序接收到输入数据,其余处理都与非streaming应用程序相同。也就是说,与普通的Spark数据流应用程序同样,在Spark Streaming应用程序中,你将使用相同的工具和模式。更多详情可见Level of Parallelism in Data Processing 文档。

所以,咱们一样将得到两个控制手段:

1. input DStreams的数量 ,也就是说,咱们在以前章节中read parallelism的数量做为结果。这是咱们的立足点,这样一来,咱们在下一个步骤中既能够保持原样,也能够进行修改。

2. DStream转化的重分配 。这里将得到一个全新的DStream,其parallelism等级可能增长、减小,或者保持原样。在DStream中每一个返回的RDD都有指定的N个分区。DStream由一系列的RDD组成,DStream.repartition则是经过RDD.repartition实现。接下来将对RDD中的全部数据作随机的reshuffles,而后创建或多或少的分区,并进行平衡。同时,数据会在全部网络中进行shuffles。换句话说,DStream.repartition很是相似Storm中的shuffle grouping。

所以,repartition是从processing parallelism解耦read parallelism的主要途径。在这里,咱们能够设置processing tasks的数量,也就是说设置处理过程当中全部core的数量。间接上,咱们一样设置了投入machines/NICs的数量。

一个DStream转换相关是 union 。这个方法一样在StreamingContext中,它将从多个DStream中返回一个统一的DStream,它将拥有相同的类型和滑动时间。一般状况下,你更愿意用StreamingContext的派生。一个union将返回一个由Union RDD支撑的UnionDStream。Union RDD由RDDs统一后的全部分区组成,也就是说,若是10个分区都联合了3个RDDs,那么你的联合RDD实例将包含30个分区。换句话说,union会将多个 DStreams压缩到一个 DStreams或者RDD中,可是须要注意的是,这里的parallelism并不会发生改变。你是否使用union依赖于你的用例是否须要从全部Kafka分区进行“in one place”信息获取决定,所以这里大部分都是基于语义需求决定。举个例子,当你须要执行一个不用元素上的(全局)计数。

注意: RDDs是无序的。所以,当你union RDDs时,那么结果RDD一样不会拥有一个很好的序列。若是你须要在RDD中进行sort。

你的用例将决定须要使用的方法,以及你须要使用哪一个。若是你的用例是CPU密集型的,你但愿对zerg.hydra topic进行5 read parallelism读取。也就是说,每一个消费者进程使用5个receiver,可是却能够将processing parallelism提高到20。

val ssc: StreamingContext = ???val kafkaParams: Map[String, String] = Map("group.id" -> "terran", ...)val readParallelism = 5val topics = Map("zerg.hydra" -> 1)val kafkaDStreams = (1 to readParallelism).map { _ =>
  KafkaUtils.createStream(ssc, kafkaParams, topics, ...)
  }//> collection of five *input* DStreams = handled by five receivers/tasksval unionDStream = ssc.union(kafkaDStreams) // often unnecessary, just showcasing how to do it//> single DStreamval processingParallelism = 20val processingDStream = unionDStream(processingParallelism)//> single DStream but now with 20 partitions

在下一节中,我将把全部部分结合到一块儿,而且联合实际数据处理进行讲解。

写入到Kafka

写入到Kafka须要从foreachRDD输出操做进行:

通用的输出操做者都包含了一个功能(函数),让每一个RDD都由Stream生成。这个函数须要将每一个RDD中的数据推送到一个外部系统,好比将RDD保存到文件,或者经过网络将它写入到一个数据库。须要注意的是,这里的功能函数将在驱动中执行,同时其中一般会伴随RDD行为,它将会促使流RDDs的计算。

注意: 重提“功能函数是在驱动中执行”,也就是Kafka生产者将从驱动中进行,也就是说“功能函数是在驱动中进行评估”。当你使用foreachRDD从驱动中读取Design Patterns时,实际过程将变得更加清晰。

在这里,建议你们去阅读Spark文档中的 Design Patterns for using foreachRDD一节,它将详细讲解使用foreachRDD读外部系统中的一些经常使用推荐模式,以及常常出现的一些陷阱。

在咱们这个例子里,咱们将按照推荐来重用Kafka生产者实例,经过生产者池跨多个RDDs/batches。 我经过 Apache Commons Pool 实现了这样一个工具,已经上传到GitHub 。这个生产者池自己经过 broadcast variable 提供给tasks。

最终结果看起来以下:

val producerPool = {
  // See the full code on GitHub for details on how the pool is created
  val pool = createKafkaProducerPool(kafkaZkCluster.kafka.brokerList, outputTopic.name)
  ssc.sparkContext.broadcast(pool)}stream.map { ... }.foreachRDD(rdd => {
  rdd.foreachPartition(partitionOfRecords => {
    // Get a producer from the shared pool
    val p = producerPool.value.borrowObject()
    partitionOfRecords.foreach { case tweet: Tweet =>
      // Convert pojo back into Avro binary format
      val bytes = converter.value.apply(tweet)
      // Send the bytes to Kafka
      p.send(bytes)
    }
    // Returning the producer to the pool also shuts it down
    producerPool.value.returnObject(p)
  })})

须要注意的是, Spark Streaming每分钟都会创建多个RDDs,每一个都会包含多个分区,所以你无需为Kafka生产者实例创建新的Kafka生产者,更不用说每一个Kafka消息。上面的步骤将最小化Kafka生产者实例的创建数量,同时也会最小化TCP链接的数量(一般由Kafka集群肯定)。你可使用这个池设置来精确地控制对流应用程序可用的Kafka生产者实例数量。若是存在疑惑,尽可能用更少的。

完整示例

下面的代码是示例Spark Streaming应用程序的要旨(全部代码参见 这里 )。这里,我作一些解释:

  • 并行地从Kafka topic中读取Avro-encoded数据。咱们使用了一个最佳的read parallelism,每一个Kafka分区都配置了一个单线程 input DStream。

  • 并行化Avro-encoded数据到pojos中,而后将他们并行写到binary,序列化能够经过Twitter Bijection 执行。

  • 经过Kafka生产者池将结果写回一个不一样的Kafka topic。

// Set up the input DStream to read from Kafka (in parallel)val kafkaStream = {
  val sparkStreamingConsumerGroup = "spark-streaming-consumer-group"
  val kafkaParams = Map(
    "zookeeper.connect" -> "zookeeper1:2181",
    "group.id" -> "spark-streaming-test",
    "zookeeper.connection.timeout.ms" -> "1000")
  val inputTopic = "input-topic"
  val numPartitionsOfInputTopic = 5
  val streams = (1 to numPartitionsOfInputTopic) map { _ =>
    KafkaUtils.createStream(ssc, kafkaParams, Map(inputTopic -> 1), StorageLevel.MEMORY_ONLY_SER).map(_._2)
  }
  val unifiedStream = ssc.union(streams)
  val sparkProcessingParallelism = 1 // You'd probably pick a higher value than 1 in production.
  unifiedStream.repartition(sparkProcessingParallelism)}// We use accumulators to track global "counters" across the tasks of our streaming appval numInputMessages = ssc.sparkContext.accumulator(0L, "Kafka messages consumed")val numOutputMessages = ssc.sparkContext.accumulator(0L, "Kafka messages produced")// We use a broadcast variable to share a pool of Kafka producers, which we use to write data from Spark to Kafka.val producerPool = {
  val pool = createKafkaProducerPool(kafkaZkCluster.kafka.brokerList, outputTopic.name)
  ssc.sparkContext.broadcast(pool)}// We also use a broadcast variable for our Avro Injection (Twitter Bijection)val converter = ssc.sparkContext.broadcast(SpecificAvroCodecs.toBinary[Tweet])// Define the actual data flow of the streaming jobkafkaStream.map { case bytes =>
  numInputMessages += 1
  // Convert Avro binary data to pojo
  converter.value.invert(bytes) match {
    case Success(tweet) => tweet    case Failure(e) => // ignore if the conversion failed
  }}.foreachRDD(rdd => {
  rdd.foreachPartition(partitionOfRecords => {
    val p = producerPool.value.borrowObject()
    partitionOfRecords.foreach { case tweet: Tweet =>
      // Convert pojo back into Avro binary format
      val bytes = converter.value.apply(tweet)
      // Send the bytes to Kafka
      p.send(bytes)
      numOutputMessages += 1
    }
    producerPool.value.returnObject(p)
  })})// Run the streaming jobssc.start()ssc.awaitTermination()

更多的细节和解释能够在这里看全部源代码。

就我本身而言,我很是喜欢 Spark Streaming代码的简洁和表述。在Bobby Evans和 Tom Graves讲话中没有提到的是,Storm中这个功能的等价代码是很是繁琐和低等级的: kafka-storm-starter 中的 KafkaStormSpec 会运行一个Stormtopology来执行相同的计算。同时,规范文件自己只有很是少的代码,固然是除下说明语言,它们能更好的帮助理解;同时,须要注意的是,在Storm的Java API中,你不能使用上文Spark Streaming 示例中所使用的匿名函数,好比map和foreach步骤。取而代之的是,你必须编写完整的类来得到相同的功能,你能够查看 AvroDecoderBolt 。这感受是将Spark的API转换到Java,在这里使用匿名函数是很是痛苦的。

最后,我一样也很是喜欢 Spark的说明文档 ,它很是适合初学者查看,甚至还包含了一些 进阶使用 。关于Kafka整合到Spark,上文已经基本介绍完成,可是咱们仍然须要浏览mailing list和深挖源代码。这里,我不得不说,维护帮助文档的同窗作的实在是太棒了。

知晓Spark Streaming中的一些已知问题

你可能已经发如今Spark中仍然有一些还没有解决的问题,下面我描述一些个人发现:

一方面,在对Kafka进行读写上仍然存在一些含糊不清的问题,你能够在相似Multiple Kafka Receivers and Union  How to scale more consumer to Kafka stream mailing list的讨论中发现。

另外一方面,Spark Streaming中一些问题是由于Spark自己的固有问题致使,特别是故障发生时的数据丢失问题。换句话说,这些问题让你不想在生产环境中使用Spark。

  • 在Spark 1.1版本的驱动中,Spark并不会考虑那些已经接收却由于种种缘由没有进行处理的元数据( 点击这里查看更多细节 )。所以,在某些状况下,你的Spark可能会丢失数据。Tathagata Das指出驱动恢复问题会在Spark的1.2版本中解决,当下已经释放。

  • 1.1版本中的Kafka链接器是基于Kafka的高等级消费者API。这样就会形成一个问题,Spark Streaming不能够依赖其自身的KafkaInputDStream将数据从Kafka中从新发送,从而没法解决下游数据丢失问题(好比Spark服务器发生故障)。

  • 有些人甚至认为这个版本中的Kafka链接器不该该投入生产环境使用,由于它是基于Kafka的高等级消费者API。取而代之,Spark应该使用简单的消费者API(就像Storm中的Kafka spout),它将容许你控制便宜和分区分配肯定性。

  • 可是当下Spark社区已经在致力这些方面的改善,好比Dibyendu Bhattacharya的Kafka链接器。后者是Apache Storm Kafka spout的一个端口,它基于Kafka所谓的简单消费者API,它包含了故障发生情景下一个更好的重放机制。

  • 即便拥有如此多志愿者的努力,Spark团队更愿意非特殊状况下的Kafka故障恢复策略,他们的目标是“在全部转换中提供强保证,通用的策略”,这一点很是难以理解。从另外一个角度来讲,这是浪费Kafka自己的故障恢复策略。这里确实难以抉择。

  • 这种状况一样也出如今写入状况中,极可能会形成数据丢失。

  • Spark的Kafka消费者参数auto.offset.reset的使用一样与Kafka的策略不一样。在Kafka中,将auto.offset.reset设置为最小是消费者将自动的将offset设置为最小offset,这一般会发生在两个状况:第一,在ZooKeeper中不存在已有offsets;第二,已存在offset,可是不在范围内。而在Spark中,它会始终删除全部的offsets,并从头开始。这样就表明着,当你使用auto.offset.reset = “smallest”重启你的应用程序时,你的应用程序将彻底从新处理你的Kafka应用程序。更多详情能够在下面的两个讨论中发现: 1  2 

  • Spark-1341:用于控制Spark Streaming中的数据传输速度。这个能力能够用于不少状况,当你已经受Kafka引发问题所烦恼时(好比auto.offset.reset所形成的),而后可能让你的应用程序从新处理一些旧数据。可是鉴于这里并无内置的传输速率控制,这个功能可能会致使你的应用程序过载或者内存不足。

在这些故障处理策略和Kafka聚焦的问题以外以外,扩展性和稳定性上的关注一样不可忽视。再一次,仔细观看 Bobby和Tom的视频 以得到更多细节。在Spark使用经验上,他们都永远比我更丰富。

固然,我也有个人 评论 ,在 G1 garbage(在Java 1.7.0u4+中) 上可能也会存在问题。可是,我历来都没碰到这个问题。

Spark使用技巧和敲门

在我实现这个示例的代码时,我作了一些重要的笔记。虽然这不是一个全面的指南,可是在你开始Kafka整合时可能发挥必定的做用。它包含了 Spark Streaming programming guide 中的一些信息,也有一些是来自Spark用户的mailing list。

通用

  • 当你创建你的Spark环境时,对Spark使用的cores数量配置须要特别投入精力。你必须为Spark配置receiver足够使用的cores(见下文),固然实际数据处理所须要的cores的数量也要进行配置。在Spark中,每一个receiver都负责一个input DStream。同时,每一个receiver(以及每一个input DStream) occies一个core,这样作是服务于每一个文件流中的读取(详见文档)。举个例子,你的做业须要从两个input streams中读取数据,可是只访问两个cores,这样一来,全部数据都只会被读取而不会被处理。

  • 注意,在一个流应用程序中,你能够创建多个input DStreams来并行接收多个数据流。在上文从Kafka并行读取一节中,我曾演示过这个示例做业。

  • 你可使用 broadcast variables在不一样主机上共享标准、只读参数,相关细节见下文的优化指导。在示例做业中,我使用了broadcast variables共享了两个参数:第一,Kafka生产者池(做业经过它将输出写入Kafka);第二,encoding/decoding Avro数据的注入(从Twitter Bijection中)。 Passing functions to Spark 

  • 你可使用累加器参数来跟踪流做业上的全部全局“计数器”,这里能够对照Hadoop做业计数器。在示例做业中,我使用累加器分别计数全部消费的Kafka消息,以及全部对Kafka的写入。若是你对累加器进行命名,它们一样能够在Spark UI上展现。

  • 不要忘记import Spark和Spark Streaming环境:

// Required to gain access to RDD transformations via implicits.import org.apache.spark.SparkContext._// Required when working on `PairDStreams` to gain access to e.g. `DStream.reduceByKey`// (versus `DStream.transform(rddBatch => rddBatch.reduceByKey()`) via implicits.//// See also http://spark.apache.org/docs/1.1.0/programming-guide.html#working-with-key-value-pairsimport org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions

若是你是 Twitter Algebird的爱好者,你将会喜欢使用Count-Min Sketch和Spark中的一些特性,表明性的,你会使用reduce或者reduceByWindow这样的操做(好比,DStreams上的转换 )。Spark项目包含了 Count-Min Sketch  HyperLogLog 的示例介绍。

若是你须要肯定Algebird数据结构的内存介绍,好比Count-Min Sketch、HyperLogLog或者Bloom Filters,你可使用SparkContext日志进行查看,更多细节参见 Determining Memory Consumption 

Kafka整合

我前文所述的一些增补:

  • 你可能须要修改Spark Streaming中的一些Kafka消费者配置。举个例子,若是你须要从Kafka中读取大型消息,你必须添加fetch.message.max.bytes消费设置。你可使用KafkaUtils.createStream(…)将这样定制的Kafka参数给Spark Streaming传送。

测试

  • 首先,肯定 已经 在一个finally bloc或者测试框架的teardown method中使用stop()关闭了StreamingContext 和/或 SparkContext,由于在同一个程序(或者JVM?)中Spark不支持并行运行两种环境。

  • 根据个人经验,在使用sbt时,你但愿在测试中将你的创建配置到分支JVM中。最起码在kafka-storm-starter中,测试必须并行运行多个线程,好比ZooKeeper、Kafka和Spark的内存实例。开始时,你能够参考 build.sbt 

  • 一样,若是你使用的是Mac OS X,你可能指望关闭JVM上的IPv6用以阻止DNS相关超时。这个问题与Spark无关,你能够查看 .sbtopts 来得到关闭IPv6的方法。

性能调优

  • 肯定你理解做业中的运行时影响,若是你须要与外部系统通讯,好比Kafka。在使用foreachRDD时,你应该阅读中 Spark Streaming programming guide 中的Design Patterns一节。举个例子,个人用例中使用Kafka产生者池来优化 Spark Streaming到Kafka的写入。在这里,优化意味着在多个task中共享同一个生产者,这个操做能够显著地减小由Kafka集群创建的新TCP链接数。

  • 使用Kryo作序列化,取代默认的Java serialization,详情能够访问 Tuning Spark 。个人例子就使用了Kryo和注册器,举个例子,使用Kryo生成的Avro-generated Java类(见 KafkaSparkStreamingRegistrator )。除此以外,在Storm中相似的问题也可使用Kryo来解决。

  • 经过将spark.streaming.unpersist设置为true将Spark Streaming 做业设置到明确持续的RDDs。这能够显著地减小Spark在RDD上的内存使用,同时也能够改善GC行为。(点击访问 来源 

  • 经过MEMORY_ONLY_SER开始你的储存级别P&S测试(在这里,RDD被存储到序列化对象,每一个分区一个字节)。取代反序列化,这样作更有空间效率,特别是使用Kryo这样的高速序列化工具时,可是会增长读取上的CPU密集操做。这个优化对 Spark Streaming做业也很是有效。对于本地测试来讲,你可能并不想使用*_2派生(2=复制因子)。

总结

完整的Spark Streaming示例代码能够在 kafka-storm-starter 查看。这个应用包含了Kafka、Zookeeper、Spark,以及上文我讲述的示例。

整体来讲,我对个人初次Spark Streaming体验很是满意。固然,在Spark/Spark Streaming也存在一些须要特别指明的问题,可是我确定Spark社区终将解决这些问题。在这个过程当中,获得了Spark社区积极和热情的帮助,同时我也很是期待Spark 1.2版本的新特性。

在大型生产环境中,基于Spark还须要一些TLC才能达到Storm能力,这种状况我可能将它投入生产环境中么?大部分状况下应该不会,更准确的说应该是如今不会。那么在当下,我又会使用Spark Streaming作什么样的处理?这里有两个想法,我认为确定存在更多:

  • 它能够很是快的原型数据流。若是你由于数据流太大而遭遇扩展性问题,你能够运行 Spark Streaming,在一些样本数据或者一部分数据中。

  • 搭配使用Storm和Spark Streaming。举个例子,你可使用Storm将原始、大规模输入数据处理到易管理等级,而后使用Spark Streaming来作下一步的分析,由于后者能够开箱即用大量有趣的算法、计算指令和用例。

感谢Spark社区对大数据领域所做出的贡献!

 

翻译/童阳

文章出处:推酷-CSDN

相关文章
相关标签/搜索