Spark踩坑记——Spark Streaming+Kafka

前言

在WeTest舆情项目中,须要对天天千万级的游戏评论信息进行词频统计,在生产者一端,咱们将数据按照天天的拉取时间存入了Kafka当中,而在消费者一端,咱们利用了spark streaming从kafka中不断拉取数据进行词频统计。本文首先对spark streaming嵌入kafka的方式进行概括总结,以后简单阐述Spark streaming+kafka在舆情项目中的应用,最后将本身在Spark Streaming+kafka的实际优化中的一些经验进行概括总结。(若有任何纰漏欢迎补充来踩,我会第一时间改正^v^)java

Spark streaming接收Kafka数据

用spark streaming流式处理kafka中的数据,第一步固然是先把数据接收过来,转换为spark streaming中的数据结构Dstream。接收数据的方式有两种:1.利用Receiver接收数据,2.直接从kafka读取数据。git

基于Receiver的方式

这种方式利用接收器(Receiver)来接收kafka中的数据,其最基本是使用Kafka高阶用户API接口。对于全部的接收器,从kafka接收来的数据会存储在spark的executor中,以后spark streaming提交的job会处理这些数据。以下图:
Receiver图形解释
在使用时,咱们须要添加相应的依赖包:github

<dependency><!-- Spark Streaming Kafka -->
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka_2.10</artifactId>
    <version>1.6.3</version>
</dependency>

而对于Scala的基本使用方式以下:sql

import org.apache.spark.streaming.kafka._

 val kafkaStream = KafkaUtils.createStream(streamingContext, 
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

还有几个须要注意的点:数据库

  • 在Receiver的方式中,Spark中的partition和kafka中的partition并非相关的,因此若是咱们加大每一个topic的partition数量,仅仅是增长线程来处理由单一Receiver消费的主题。可是这并无增长Spark在处理数据上的并行度。
  • 对于不一样的Group和topic咱们可使用多个Receiver建立不一样的Dstream来并行接收数据,以后能够利用union来统一成一个Dstream。
  • 若是咱们启用了Write Ahead Logs复制到文件系统如HDFS,那么storage level须要设置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)

直接读取方式

在spark1.3以后,引入了Direct方式。不一样于Receiver的方式,Direct方式没有receiver这一层,其会周期性的获取Kafka中每一个topic的每一个partition中的最新offsets,以后根据设定的maxRatePerPartition来处理每一个batch。其形式以下图:

这种方法相较于Receiver方式的优点在于:apache

  • 简化的并行:在Receiver的方式中咱们提到建立多个Receiver以后利用union来合并成一个Dstream的方式提升数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。
  • 高效:在Receiver的方式中,为了达到0数据丢失须要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要咱们Kafka的数据保留时间足够长,咱们都可以从Kafka进行数据恢复。
  • 精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但因为Spark Streaming消费的数据和Zookeeper中记录的offset不一样步,这种方式偶尔会形成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性。

以上主要是对官方文档[1]的一个简单翻译,详细内容你们能够直接看下官方文档这里再也不赘述。bootstrap

不一样于Receiver的方式,是从Zookeeper中读取offset值,那么天然zookeeper就保存了当前消费的offset值,那么若是从新启动开始消费就会接着上一次offset值继续消费。而在Direct的方式中,咱们是直接从kafka来读数据,那么offset须要本身记录,能够利用checkpoint、数据库或文件记录或者回写到zookeeper中进行记录。这里咱们给出利用Kafka底层API接口,将offset及时同步到zookeeper中的通用类,我将其放在了github上:
Spark streaming+Kafka demo
示例中KafkaManager是一个通用类,而KafkaCluster是kafka源码中的一个类,因为包名权限的缘由我把它单独提出来,ComsumerMain简单展现了通用类的使用方法,在每次建立KafkaStream时,都会先从zooker中查看上次的消费记录offsets,而每一个batch处理完成后,会同步offsets到zookeeper中。数组

Spark向kafka中写入数据

上文阐述了Spark如何从Kafka中流式的读取数据,下面我整理向Kafka中写数据。与读数据不一样,Spark并无提供统一的接口用于写入Kafka,因此咱们须要使用底层Kafka接口进行包装。
最直接的作法咱们能够想到以下这种方式:缓存

input.foreachRDD(rdd =>
  // 不能在这里建立KafkaProducer
  rdd.foreachPartition(partition =>
    partition.foreach{
      case x:String=>{
        val props = new HashMap[String, Object]()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
          "org.apache.kafka.common.serialization.StringSerializer")
        println(x)
        val producer = new KafkaProducer[String,String](props)
        val message=new ProducerRecord[String, String]("output",null,x)
        producer.send(message)
      }
    }
  )
)

可是这种方式缺点很明显,对于每一个partition的每条记录,咱们都须要建立KafkaProducer,而后利用producer进行输出操做,注意这里咱们并不能将KafkaProducer的新建任务放在foreachPartition外边,由于KafkaProducer是不可序列化的(not serializable)。显然这种作法是不灵活且低效的,由于每条记录都须要创建一次链接。如何解决呢?

  1. 首先,咱们须要将KafkaProducer利用lazy val的方式进行包装以下:
import java.util.concurrent.Future
import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerRecord, RecordMetadata }
class KafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) extends Serializable {
  /* This is the key idea that allows us to work around running into
     NotSerializableExceptions. */
  lazy val producer = createProducer()
  def send(topic: String, key: K, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, key, value))
  def send(topic: String, value: V): Future[RecordMetadata] =
    producer.send(new ProducerRecord[K, V](topic, value))
}

object KafkaSink {
  import scala.collection.JavaConversions._
  def apply[K, V](config: Map[String, Object]): KafkaSink[K, V] = {
    val createProducerFunc = () => {
      val producer = new KafkaProducer[K, V](config)
      sys.addShutdownHook {
        // Ensure that, on executor JVM shutdown, the Kafka producer sends
        // any buffered messages to Kafka before shutting down.
        producer.close()
      }
      producer
    }
    new KafkaSink(createProducerFunc)
  }
  def apply[K, V](config: java.util.Properties): KafkaSink[K, V] = apply(config.toMap)
}
  1. 以后咱们利用广播变量的形式,将KafkaProducer广播到每个executor,以下:
// 广播KafkaSink
val kafkaProducer: Broadcast[KafkaSink[String, String]] = {
  val kafkaProducerConfig = {
    val p = new Properties()
    p.setProperty("bootstrap.servers", Conf.brokers)
    p.setProperty("key.serializer", classOf[StringSerializer].getName)
    p.setProperty("value.serializer", classOf[StringSerializer].getName)
    p
  }
  log.warn("kafka producer init done!")
  ssc.sparkContext.broadcast(KafkaSink[String, String](kafkaProducerConfig))
}

这样咱们就能在每一个executor中愉快的将数据输入到kafka当中:

//输出到kafka
segmentedStream.foreachRDD(rdd => {
  if (!rdd.isEmpty) {
    rdd.foreach(record => {
      kafkaProducer.value.send(Conf.outTopics, record._1.toString, record._2)
      // do something else
    })
  }
})

Spark streaming+Kafka应用

WeTest舆情监控对于天天爬取的千万级游戏玩家评论信息都要实时的进行词频统计,对于爬取到的游戏玩家评论数据,咱们会生产到Kafka中,而另外一端的消费者咱们采用了Spark Streaming来进行流式处理,首先利用上文咱们阐述的Direct方式从Kafka拉取batch,以后通过分词、统计等相关处理,回写到DB上(至于Spark中DB的回写方式可参考我以前总结的博文:Spark踩坑记——数据库(Hbase+Mysql)),由此高效实时的完成天天大量数据的词频统计任务。

Spark streaming+Kafka调优

Spark streaming+Kafka的使用中,当数据量较小,不少时候默认配置和使用便可以知足状况,可是当数据量大的时候,就须要进行必定的调整和优化,而这种调整和优化自己也是不一样的场景须要不一样的配置。

合理的批处理时间(batchDuration)

几乎全部的Spark Streaming调优文档都会说起批处理时间的调整,在StreamingContext初始化的时候,有一个参数即是批处理时间的设定。若是这个值设置的太短,即个batchDuration所产生的Job并不能在这期间完成处理,那么就会形成数据不断堆积,最终致使Spark Streaming发生阻塞。并且,通常对于batchDuration的设置不会小于500ms,由于太小会致使SparkStreaming频繁的提交做业,对整个streaming形成额外的负担。在平时的应用中,根据不一样的应用场景和硬件配置,我设在1~10s之间,咱们能够根据SparkStreaming的可视化监控界面,观察Total Delay来进行batchDuration的调整,以下图:

合理的Kafka拉取量(maxRatePerPartition重要)

对于Spark Streaming消费kafka中数据的应用场景,这个配置是很是关键的,配置参数为:spark.streaming.kafka.maxRatePerPartition。这个参数默认是没有上线的,即kafka当中有多少数据它就会直接所有拉出。而根据生产者写入Kafka的速率以及消费者自己处理数据的速度,同时这个参数须要结合上面的batchDuration,使得每一个partition拉取在每一个batchDuration期间拉取的数据可以顺利的处理完毕,作到尽量高的吞吐量,而这个参数的调整能够参考可视化监控界面中的Input Rate和Processing Time,以下图:

缓存反复使用的Dstream(RDD)

Spark中的RDD和SparkStreaming中的Dstream,若是被反复的使用,最好利用cache(),将该数据流缓存起来,防止过分的调度资源形成的网络开销。能够参考观察Scheduling Delay参数,以下图:

设置合理的GC

长期使用Java的小伙伴都知道,JVM中的垃圾回收机制,可让咱们不过多的关注与内存的分配回收,更加专一于业务逻辑,JVM都会为咱们搞定。对JVM有些了解的小伙伴应该知道,在Java虚拟机中,将内存分为了初生代(eden generation)、年轻代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是须要耗费必定时间的,尤为是老年代的GC回收,须要对内存碎片进行整理,一般采用标记-清楚的作法。一样的在Spark程序中,JVM GC的频率和时间也是影响整个Spark效率的关键因素。在一般的使用中建议:

--conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"

设置合理的CPU资源数

CPU的core数量,每一个executor能够占用一个或多个core,能够经过观察CPU的使用率变化来了解计算资源的使用状况,例如,很常见的一种浪费是一个executor占用了多个core,可是总的CPU使用率却不高(由于一个executor并不总能充分利用多核的能力),这个时候能够考虑让么个executor占用更少的core,同时worker下面增长更多的executor,或者一台host上面增长更多的worker来增长并行执行的executor的数量,从而增长CPU利用率。可是增长executor的时候须要考虑好内存消耗,由于一台机器的内存分配给越多的executor,每一个executor的内存就越小,以至出现过多的数据spill over甚至out of memory的状况。

设置合理的parallelism

partition和parallelism,partition指的就是数据分片的数量,每一次task只能处理一个partition的数据,这个值过小了会致使每片数据量太大,致使内存压力,或者诸多executor的计算能力没法利用充分;可是若是太大了则会致使分片太多,执行效率下降。在执行action类型操做的时候(好比各类reduce操做),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操做的时候,默认返回数据的paritition数量(而在进行map类操做的时候,partition数量一般取自parent RDD中较大的一个,并且也不会涉及shuffle,所以这个parallelism的参数没有影响)。因此说,这两个概念密切相关,都是涉及到数据分片的,做用方式实际上是统一的。经过spark.default.parallelism能够设置默认的分片数量,而不少RDD的操做均可以指定一个partition参数来显式控制具体的分片数量。
在SparkStreaming+kafka的使用中,咱们采用了Direct链接方式,前文阐述过Spark中的partition和Kafka中的Partition是一一对应的,咱们通常默认设置为Kafka中Partition的数量。

使用高性能的算子

这里参考了美团技术团队的博文,并无作过具体的性能测试,其建议以下:

  • 使用reduceByKey/aggregateByKey替代groupByKey
  • 使用mapPartitions替代普通map
  • 使用foreachPartitions替代foreach
  • 使用filter以后进行coalesce操做
  • 使用repartitionAndSortWithinPartitions替代repartition与sort类操做

使用Kryo优化序列化性能

这个优化原则我自己也没有通过测试,可是好多优化文档有提到,这里也记录下来。
在Spark中,主要有三个地方涉及到了序列化:

  • 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输(见“原则七:广播大变量”中的讲解)。
  • 将自定义的类型做为RDD的泛型类型时(好比JavaRDD,Student是自定义类型),全部自定义类型对象,都会进行序列化。所以这种状况下,也要求自定义的类必须实现Serializable接口。
  • 使用可序列化的持久化策略时(好比MEMORY_ONLY_SER),Spark会将RDD中的每一个partition都序列化成一个大的字节数组。

对于这三种出现序列化的地方,咱们均可以经过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。可是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高不少。官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之因此默认没有使用Kryo做为序列化类库,是由于Kryo要求最好要注册全部须要进行序列化的自定义类型,所以对于开发者来讲,这种方式比较麻烦。

如下是使用Kryo的代码示例,咱们只要设置序列化类,再注册要序列化的自定义类型便可(好比算子函数中使用到的外部变量类型、做为RDD泛型类型的自定义类型等):

// 建立SparkConf对象。
val conf = new SparkConf().setMaster(...).setAppName(...)
// 设置序列化器为KryoSerializer。
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
// 注册要序列化的自定义类型。
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

结果

通过种种调试优化,咱们最终要达到的目的是,Spark Streaming可以实时的拉取Kafka当中的数据,而且可以保持稳定,以下图所示:

固然不一样的应用场景会有不一样的图形,这是本文词频统计优化稳定后的监控图,咱们能够看到Processing Time这一柱形图中有一Stable的虚线,而大多数Batch都可以在这一虚线下处理完毕,说明总体Spark Streaming是运行稳定的。

参考文献

  1. Spark Streaming + Kafka Integration Guide
  2. DirectStream、Stream的区别-SparkStreaming源码分析02
  3. Spark Streaming性能调优详解
  4. Spark性能优化指南——基础篇
  5. Spark的性能调优
  6. How to write to Kafka from Spark Streaming
相关文章
相关标签/搜索