Spark Streaming编程指南

  本文基于Spark Streaming Programming Guide原文翻译, 加上一些本身的理解和小实验的结果。
  php

1、概述

  Spark Streaming是基于Core Spark API的可扩展,高吞吐量,并具备容错能力的用于处理实时数据流的一个组件。Spark Streaming能够接收各类数据源传递来的数据,好比Kafka, Flume, Kinesis或者TCP等,对接收到的数据还可使用一些用高阶函数(好比map, reduce, joinwindow)进行封装的复杂算法作进一步的处理。最后,处理好的数据能够写入到文件系统,数据库,或者直接用于实时展现。除此以外,还能够在数据流上应用一些机器学习或者图计算等算法。
  这里写图片描述html

  上图展现了Spark Streaming的总体数据流转状况。在Spark Streaming中的处理过程能够参考下图,Spark Streaming接收实时数据,而后把这些数据分割成一个个batch,而后经过Spark Engine分别处理每个batch并输出。
  这里写图片描述java

  Spark Streaming中一个最重要的概念是DStream,即离散化数据流(discretized stream),DStream由一系列连续的数据集组成。DStream的建立有两种办法,一种是从数据源接收数据生成初始DStream,另外一种是由DStream A经过转换生成DStream B。一个DStream实质上是由一系列的RDDs组成。
  本文介绍了如何基于DStream写出Spark Streaming程序。Spark Streaming提供了Scala, Java以及Python接口,在官方文档中对这三种语言都有示例程序的实现,在这里只分析Scala写的程序。python

2、示例程序

  在深刻分析Spark Streaming的特性和原理以前,以写一个简单的Spark Streaming程序并运行起来为入口先了解一些相关的基础知识。这个示例程序从TCP socket中接收数据,进行Word Count操做。git

一、Streaming程序编写

  首先须要导入Spark Streaming相关的类,其中StreamingContext是全部Streaming程序的主要入口。接下来的代码中建立一个local StreamingContext,batch时间为1秒,execution线程数为2。github

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// 建立一个local StreamingContext batch时间为1秒,execution线程数为2
// master的线程数数最少为2,后面会详细解释

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, econds(1))

使用上面这个ssc对象,就能够建立一个lines变量用来表示从TCP接收的数据流了,指定机器名为localhost端口号为9999web

// 建立一个链接到hostname:port的DStream, 下面代码中使用的是localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

lines中的每一条记录都是TCP中的一行文本信息。接下来,使用空格将每一行语句进行分割。算法

// 将每一行分割成单词
val words = lines.flatMap(_.split(" "))

上面使用的flatMap操做是一个一对多的DStream操做,在这里表示的是每输入一行记录,会根据空格生成多个单词,这些单词造成一个新的DStream words。接下来统计单词个数。sql

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// 统计每一个batch中的不一样单词个数
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// 打印出其中前10个单词出现的次数
wordCounts.print()

  上面代码中,将每个单词使用map方法映射成(word, 1)的形式,即paris变量。而后调用reduceByKey方法,将相同单词出现的次数进行叠加,最终打印出统计的结果。数据库

  写完上面的代码,Spark Streaming程序尚未运行起来,须要写入如下两行代码使Spark Streaming程序可以真正的开始执行。

ssc.start()            // 开始计算
ssc.awaitTermination()  // 等待计算结束

二、TCP发送数据并运行Spark Streaming程序

(1)运行Netcat
  使用如下命令启动一个Netcat

nc -lk 9999

  接下来就能够在命令行中输入任意语句了。

(2)运行Spark Streaming程序

./bin/run-example streaming.NetworkWordCount localhost 9999

  程序运行起来后Netcat中输入的任何语句,都会被统计每一个单词出现的次数,例如
  这里写图片描述

3、基本概念

  这一部分详细介绍Spark Streaming中的基本概念。

一、依赖配置

  Spark Streaming相关jar包的依赖也可使用Maven来管理,写一个Spark Streaming程序的时候,须要将下面的内容写入到Maven项目中

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.0.0</version>
</dependency>

  对于从Kafka,Flume,Kinesis这些数据源接收数据的状况,Spark Streaming core API中不提供这些类和接口,须要添加下面这些依赖。
  

Source Artifact
Kafka spark-streaming-kafka-0-8_2.11
Flume spark-streaming-flume_2.11
Kinesis spark-streaming-kinesis-asl_2.11 [Amazon Software License]

二、初始化StreamingContext

  Spark Streaming程序的主要入口是一个StreamingContext对象,在程序的开始,须要初始化该对象,代码以下

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

  其中的参数appName是当前应用的名称,能够在Cluster UI上进行显示。master是Spark的运行模式,能够参考 Spark, Mesos or YARN cluster URL,或者设置成local[*]的形式在本地模式下运行。在生产环境中运行Streaming应用时,通常不会将master参数写死在代码中,而是在使用spark-submit命令提交时动态传入--master参数,具体能够参考 launch the application with spark-submit
  至于batch时间间隔的设置,须要综合考虑程序的性能要求以及集群可提供的资源状况。

  也能够基于SparkContext对象,生成一个StreamingContext对象,使用以下代码

import org.apache.spark.streaming._

val sc = ...                // 已有的SparkContext对象
val ssc = new StreamingContext(sc, Seconds(1))

  当context初始化后,还须要作的工做有:

  1. 根据数据源类型生成输入DStreams
  2. 经过调用transformation以及输出操做处理输入的DStreams
  3. 使用代码streamingContext.start()启动程序,开始接收并处理数据
  4. 使用代码streamingContext.awaitTermination()等待程序运行终止(包括手动中止,或者遇到Error后退出应用)
  5. 可使用streamingContext.stop()手动中止应用

须要注意的点:

  • 当一个context开始运行后,不能再往其中添加新的计算逻辑
  • 当一个context被中止后,不能restart
  • 在一个JVM中只能同时有一个StreamingContext对象处于运行状态
  • StreamingContext中的stop()方法一样会终止SparkContext。若是只须要中止StreamingContext,将stop()方法的可选参数设置成false,避免SparkContext被终止
  • 一个SparkContext对象,能够用于构造多个StreamingContext对象,只要在新的StreamingContext对象被建立前,旧的StreamingContext对象被中止便可。

三、离散化数据流(Discretized Streams, DStreams)

  DStream是Spark Streaming中最基本最重要的一个抽象概念。DStream由一系列的数据组成,这些数据既能够是从数据源接收到的数据,也能够是从数据源接收到的数据通过transform操做转换后的数据。从本质上来讲一个DStream是由一系列连续的RDDs组成,DStream中的每个RDD包含了一个batch的数据。
  这里写图片描述

  DStream上的每个操做,最终都反应到了底层的RDDs上。好比,在前面那个Word Count代码中将lines转化成words的逻辑,lines上的flatMap操做就如下图中所示的形式,做用到了每个底层的RDD上。
  这里写图片描述

  这些底层RDDs上的转换操做会有Spark Engine进行计算。对于开发者来讲,DStream提供了一个更方便使用的高阶API,从而开发者无需过多的关注每个转换操做的细节。
  DStream上能够执行的操做后续文章中会有进一步的介绍。

四、输入和接收DStream

  
(1)基本数据源
  在前面Word Count的示例程序中,已经使用到了ssc.socketTextStream(...),这个会根据TCP socket中接收到的数据建立一个DStream。除了sockets以外,StreamingContext API还支持以文件为数据源生成DStream

  • 文件数据源:若是须要从文件系统,好比HDFS,S3,NFS等中接收数据,可使用如下代码
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

Spark Streaming程序会监控用户输入的dataDirectory路径,接收并处理该路径中的全部文件,不过不支持子文件夹中的文件。
须要注意的地方有:
a、全部的文件数据格式必须相同
b、该路径下的文件应该是原子性的移动到该路径,或者重命名到该路径
c、文件进入该路径后不可再发生变化,因此这种数据源不支持数据连续写入的形式
  对于简单的text文件,有一个简单的StreamingContext.textFileStream(dataDirectory)方法来进行处理。而且文件数据源的形式不须要运行一个receiver进程,因此对Execution的核数没有要求。

  • 基于自定义Receiver的数据源:DStream也支持从用户自定义Receivers中读取数据。
  • RDDs序列数据源:使用streamingContext.queueStream(queueOfRDDs),能够将一系列的RDDs转化成一个DStream。该queue中的每个RDD会被当作DStream中的一个batcn,而后以Streaming的形式处理这些数据。

(2)高阶数据源
  
(3)自定义数据源
  除了上面两类数据源以外,也能够支持自定义数据源。自定义数据源时,须要实现一个能够从自定义数据源接收数据并发送到Spark中的用户自定义receiver。具体能够参考 Custom Receiver Guide

(4)数据接收的可靠性

五、DStreams上的Transformations

  相似于RDDs,transformations可使输入DStream中的数据内容根据特定逻辑发生转换。DStreams上支持不少RDDs上相同的一些transformations
  具体含义和使用方法可参考另外一篇博客:Spark Streaming中的操做函数分析

  在上面这些transformations中,有一些须要进行进一步的分析
(1)UpdateStateByKey操做

(2)Transform操做
  transform操做及其相似的一些transformwith操做,可使DStream中的元素可以调用任意的RDD-to-RDD的操做。可使DStream调用一些只有RDD才有而DStream API没有提供的算子。例如,DStream API就不支持一个data DStream中的每个batch数据能够直接和另外的一个数据集作join操做,可是使用transform就能够实现这一功能。这个操做能够说进一步丰富了DStream的操做功能。
  再列举一个这个操做的使用场景,将某处计算到的重复信息与实时数据流中的记录进行join,而后进行filter操做,能够当作一种数据清理的方法。

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // 一个包含重复信息的RDD

val cleanedDStream = wordCounts.transform(rdd => {
  rdd.join(spamInfoRDD).filter(...) // 将重复信息与实时数据作join,而后根据指定规则filter,用于数据清洗
  ...})

  这里须要注意的是,transform传入的方法是被每个batch调用的。这样能够支持在RDD上作一些时变的操做,即RDD,分区数以及广播变量能够在不一样的batch之间发生变化。

(3)Window操做
  Spark Streaming提供一类基于窗口的操做,这类操做能够在一个滑动窗口中的数据集上进行一些transformations操做。下图展现了窗口操做的示例
  这里写图片描述

  上图中,窗口在一个DStream源上滑动,DStream源中暴露在该窗口中的RDDs可让这个窗口进行相关的一些操做。在上图中能够看到,该窗口中任一时刻都只能看到3个RDD,而且这个窗口每2秒中往前滑动一次。这里提到的两个参数,正好是任意一个窗口操做都必须指定的。

  • 窗口长度:例如上图中,窗口长度为3
  • 滑动间隔:指窗口多长时间往前滑动一次,上图中为2。

      须要注意的一点是,上面这两个参数,必须是batch时间的整数倍,上图中的batch时间为1。

      接下来展现一个简单的窗口操做示例。好比说,在前面那个word count示例程序的基础上,我但愿每隔10秒钟就统计一下当前30秒时间内的每一个单词出现的次数。这一功能的简单描述是,在paris DStream的当前30秒的数据集上,调用reduceByKey操做进行统计。为了实现这一功能,可使用窗口操做reduceByKeyAndWindow

val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

  更多的窗口操做能够参考:Spark Streaming中的操做函数分析
  

六、DStreams上的输出操做

  DStream上的输出操做,可使DStream中的数据发送到外部系统,好比数据库或文件系统中。DStream只有通过输出操做,其中的数据才能被外部系统使用。而且下面这些输出操做才真正的触发DStream对象上调用的transformations操做。这一点相似于RDDs上的Actions算子。
  输出操做的使用和功能请参考:Spark Streaming中的操做函数分析

  下面主要进一步分析foreachRDD操做往外部数据库写入数据的一些注意事项。
  
  dstream.foreachRDD是DStream输出操做中最经常使用也最重要的一个操做。关于这个操做如何正确高效的使用,下面会列举出一些使用方法和案例,能够帮助读者在使用过程当中避免踩到一些坑。
  一般状况下,若是想把数据写入到某个外部系统中时,须要为之建立一个链接对象(好比提供一个TCP链接工具用于链接远程服务器),使用这个链接工具才能将数据发送到远程系统。在Spark Streaming中,开发者极可能会在Driver端建立这个对象,而后又去Worker端使用这个对象处理记录。好比下面这个例子

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // 在driver端执行
  rdd.foreach { record =>
    connection.send(record) // 在wroker端执行
  }}

  上面这个使用方法实际上是错误的,当在driver端建立这个链接对象后,须要将这个链接对象序列化并发送到wroker端。一般状况下,链接对象都是不可传输的,即wroker端没法获取该链接对象,固然也就没法将记录经过这个链接对象发送出去了。这种状况下,应用系统的报错提示多是序列化错误(链接对象没法序列化),或者初始化错误(链接对象须要在wroker端完成初始化),等等。
  正确的作法是在worker端建立这个链接对象。
  可是,即便是在worker建立这个对象,又可能会犯如下错误。

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }}

  上面代码会为每一条记录建立一个链接对象,致使链接对象太多。 链接对象的建立个数会受到时间和系统资源状况的限制,所以为每一条记录都建立一个链接对象会致使系统出现没必要要的高负载,进一步致使系统吞吐量下降。
  一个好的办法是使用rdd.foreachPartition操做,而后为RDD的每个partition,使一个partition中的记录使用同一个链接对象。以下面代码所示

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }}

  
  最后,能够经过使用链接对象池进一步对上面的代码进行优化。使用链接对象池能够进一步提升链接对象的使用效率,使得多个RDDs/batches之间能够重复使用链接对象。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // 链接对象池是静态的,而且创建对象只有在真正使用时才被建立
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // 使用完以后,将链接对象归还到池中以便下一次使用
  }}

  须要注意的是,链接对象池中的对象最好设置成懒生成模式,即在真正使用时才去建立链接对象,而且给链接对象设置一个生命周期,必定时间不使用则注销该链接对象。

总结一下关键点:

  • DStreamstransformations操做是由输出操做触发的,相似于RDDs中的actions操做。上面列举出某些DStream的输出操做中能够将其中的元素转化成RDD,进而能够调用RDD提供的一些API操做,这时若是对RDD调用actions操做会当即强制对接收到的数据进行处理。所以,若是用户应用程序中DStream不须要任何的输出操做,或者仅仅对DStream使用一些相似于dstream.foreachRDD操做可是在这个操做中不调用任何的RDD action操做时,程序是不会进行任何实际运算的。系统只会简单的接收数据,任何丢弃数据。
  • 默认状况下,输出操做是顺序执行的。

七、累加器和广播变量

  Spark Streaming的累加器和广播变量没法从checkpoint恢复。若是在应用中既使用到checkpoint又使用了累加器和广播变量的话,最好对累加器和广播变量作懒实例化操做,这样才可使累加器和广播变量在driver失败重启时可以从新实例化。参考下面这段代码

object WordBlacklist {

  @volatile private var instance: Broadcast[Seq[String]] = null

  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          val wordBlacklist = Seq("a", "b", "c")
          instance = sc.broadcast(wordBlacklist)
        }
      }
    }
    instance
  }}

object DroppedWordsCounter {

  @volatile private var instance: Accumulator[Long] = null

  def getInstance(sc: SparkContext): Accumulator[Long] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.accumulator(0L, "WordsInBlacklistCounter")
        }
      }
    }
    instance
  }}

wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
  // Get or register the blacklist Broadcast
  val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
  // Get or register the droppedWordsCounter Accumulator
  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
  // Use blacklist to drop words and use droppedWordsCounter to count them
  val counts = rdd.filter { case (word, count) =>
    if (blacklist.value.contains(word)) {
      droppedWordsCounter += count
      false
    } else {
      true
    }
  }.collect()
  val output = "Counts at time " + time + " " + counts})

  查看完整代码请移步 source code

八、DataFrame和SQL操做

  在streaming数据上也能够很方便的使用到DataFrames和SQL操做。为了支持这种操做,须要用StreamingContext对象使用的SparkContext对象初始化一个SQLContext对象出来,SQLContext对象设置成一个懒初始化的单例对象。下面代码对前面的Word Count进行一些修改,经过使用DataFramesSQL来实现Word Count的功能。每个RDD都被转化成一个DataFrame对象,而后注册成一个临时表,最后就能够在这个临时表上进行SQL查询了。

val words: DStream[String] = ...

words.foreachRDD { rdd =>

  // 获取单例SQLContext对象
  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
  import sqlContext.implicits._

  // 将RDD[String]转化成DataFrame
  val wordsDataFrame = rdd.toDF("word")

  // 注册表
  wordsDataFrame.registerTempTable("words")

  // 在该临时表上执行sql语句操做
  val wordCountsDataFrame =
    sqlContext.sql("select word, count(*) as total from words group by word")
  wordCountsDataFrame.show()}

  查看完整代码请移步 source code.
  也能够在另外一线程获取到的Streaming数据上进行SQL操做(这里涉及到异步运行StreamingContext)。StreamingContext对象没法感知到异步SQL查询的存在,所以有StreamingContext对象有可能在SQL查询完成以前把历史数据删除掉。为了保证StreamingContext不删除须要用到的历史数据,须要告诉StreamingContext保留必定量的历史数据。例如,若是你想在某一个batch的数据上执行SQL查询操做,可是你这个SQL须要执行5分钟的时间,那么,须要执行streamingContext.remember(Minutes(5))语句告诉StreamingContext将历史数据保留5分钟。
  有关DataFrames的更多介绍,参考另外一篇博客:Spark-SQL之DataFrame操做大全

九、MLlib操做

十、缓存和持久化

  相似于RDDsDStreams也容许开发者将stream中的数据持久化到内存中。在DStream对象上使用persist()方法会将DStream对象中的每个RDD自动持久化到内存中。这个功能在某个DStream的数据须要进行屡次计算时特别有用。对于窗口操做好比reduceByWindow,以及涉及到状态的操做好比updateStateByKey,默认会对DStream对象执行持久化。所以,程序在运行时会自动将窗口操做和涉及到状态的这些操做生成的DStream对象持久化到内存中,不须要开发者显示的执行persist()操做。
  对那些经过网络接收到的streams数据(好比Kafka, Flume, Socket等),默认的持久化等级是将数据持久化到两个节点上,以保证其容错能力。
  注意,不一样于RDDs,默认状况下DStream的持久化等级是将数据序列化保存在内存中。这一特性会在后面的性能调优中进一步分析。有关持久化级别的介绍,能够参考rdd-persistence

十一、检查点

  当Streaming应用运行起来时,基本上须要7 * 24的处于运行状态,因此须要有必定的容错能力。检查点的设置就是可以支持Streaming应用程序快速的从失败状态进行恢复的。检查点保存的数据主要有两种:  

1 . 元数据(Metadata)检查点:保存Streaming应用程序的定义信息。主要用于恢复运行Streaming应用程序的driver节点上的应用。元数据包括:
  a、配置信息:建立Streaming应用程序的配置信息
  b、DStream操做:在DStream上进行的一系列操做方法
  c、未处理的batch:记录进入等待队列可是还未处理完成的批次

2 . 数据(Data)检查点:将计算获得的RDD保存起来。在一些跨批次计算并保存状态的操做时,必须设置检查点。由于在这些操做中基于其余批次数据计算获得的RDDs,随着时间的推移,计算链路会愈来愈长,若是发生错误重算的代价会特别高。

  元数据检查点信息主要用于恢复driver端的失败,数据检查点主要用于计算的恢复。

(1)何时须要使用检查点

  当应用程序出现如下两种状况时,须要配置检查点。
  
- 使用到状态相关的操做算子-好比updateStateByKey或者reduceByKeyAndWindow等,这种状况下必须为应用程序设置检查点,用于按期的对RDD进行检查点设置。
- Driver端应用程序恢复-当应用程序从失败状态恢复时,须要从检查点中读取相关元数据信息。

(2)检查点设置

  通常是在具备容错能力,高可靠的文件系统上(好比HDFS, S3等)设置一个检查点路径,用于保存检查点数据。设置检查点能够在应用程序中使用streamingContext.checkpoint(checkpointDirectory)来指定路径。
  若是想要应用程序在失败重启时使用到检查点存储的元数据信息,须要应用程序具备如下两个特性,须要使用StreamingContext.getOrCreate代码在失败时从新建立StreamingContext对象:

  • 当应用程序是第一次运行时,建立一个新的StreamingContext对象,而后开始执行程序处理DStream。
  • 当应用程序失败重启时,能够从设置的检查点路径获取元数据信息,建立一个StreamingContext对象,并恢复到失败前的状态。

      下面用Scala代码实现上面的要求。

def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // 建立一个新的StreamingContext对象
    val lines = ssc.socketTextStream(...) // 获得DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // 设置checkpoint路径
    ssc
}

// 用checkpoint元数据建立StreamingContext对象或根据上面的函数建立新的对象
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// 设置context的其余参数
context. ...

// 启动context
context.start()
context.awaitTermination()

  若是checkpointDirectory路径存在,会使用检查点元数据恢复一个StreamingContext对象。若是路径不存在,或者程序是第一次运行,则会使用functionToCreateContext来建立一个新的StreamingContext对象。
  RecoverableNetWorkWordCount示例代码演示了一个从检查点恢复应用程序的示例。
  
  须要注意的是,想要用到上面的getOrCreate功能,须要在应用程序运行时使其支持失败自动重跑的功能。这一功能,在接下来一节中有分析。

  另外,在往检查点写入数据这一过程,是会增长系统负荷的。所以,须要合理的设置写入检查点数据的时间间隔。对于小批量时间间隔(好比1秒)的应用,若是每个batch都执行检查点写入操做,会显著的下降系统的吞吐性能。相反的,若是写入检查点数据间隔过久,会致使lineage过长。对那些状态相关的须要对RDD进行检查点写入的算子,检查点写入时间间隔最好设置成batch时间间隔的整数倍。好比对于1秒的batch间隔,设置成10秒。有关检查点时间间隔,可使用dstream.checkpoint(checkpointInterval)。通常来讲,检查点时间间隔设置成5~10倍滑动时间间隔是比较合理的。

十二、部署应用程序

  这一节主要讨论如何将一个Spark Streaming应用程序部署起来。
  
(1)需求
  运行一个Spark Streaming应用程序,须要知足一下要求。

  • 须要有一个具备集群管理器的集群 - 能够参考Spark应用部署文档
  • 应用程序打成JAR包 - 须要将应用程序打成JAR包。接下来使用spark-submit命令来运行应用程序的话,在该JAR包中无需打入Spark和Spark Streaming相关JAR包。然而,若是应用程序中使用到了好比Kafka或者Flume等高阶数据源的话,须要将这些依赖的JAR包,以及这些依赖进一步的依赖都打入到应用JAR包中。好比,应用中使用到了KafkaUtils的话,须要将spark-streaming-kafka-0.8_2.11以及其依赖都打入到应用程序JAR包中。
  • 为Executor设置足够的内存 - 因为接收到的数据必须保存在内存中,必须为Executor设置足够的内存能容纳这些接收到的数据。注意,若是在应用程序中作了10分钟长度的窗口操做,系统会保存最少10分钟的数据在内存中。因此应用程序须要的内存除了由接收的数据决定以外,还须要考虑应用程序中的操做类型。
  • 设置检查点 - 若是应用程序须要用到检查点,则须要在文件存储系统上设置好检查点路径。
  • 为应用程序的Driver设置自动重启 - 为了实现driver失败后自动重启的功能,应用程序运行的系统必须可以监控driver进程,而且若是发现driver失败时可以自动重启应用。不一样的集群使用不一样的方式实现自动重启功能。
    • Spark Standalone - 在这种模式下,driver程序运行在某个wroker节点上。而且,Standalone集群管理器会监控driver程序,若是发现driver中止运行,而且其状态码为非零值或者因为运行driver程序的节点失败致使driver失败,就会自动重启该应用。具体的监控和失败重启能够进一步参考Spark Standalone guide
    • YARN - Yarn支持相似的自动重启应用的机制。更多的细节能够进一步参考YARN的相关文档
    • Mesos - Mesos使用Marathon实现了自动重启功能
  • 设置write ahead logs - 从Spark-1.2版本开始,引入了一个write ahead log机制来实现容错。若是设置了WAL功能,全部接收到的数据会写入write ahead log中。这样能够避免driver重启时出现数据丢失,所以能够保证数据的零丢失,这一点能够参考前面有关介绍。经过将spark.streaming.receiver.writeAheadLog.enable=true来开启这一功能。然而,这一功能的开启会下降数据接收的吞吐量。这是能够经过同时并行运行多个接收进程(这一点在后面的性能调优部分会有介绍)进行来抵消该负面影响。另外,若是已经设置了输入数据流的存储级别为Storagelevel.MEMORY_AND_DISK_SET,因为接收到的数据已经会在文件系统上保存一份,这样就能够关闭WAL功能了。当使用S3以及其余任何不支持flushng功能的文件系统来write ahead logs时,要记得设置spark.streaming.driver.writeAheadLog.closeFileAfterWrite以及spark.streaming.receiver.writeAheadLog.closeFileAfterWrite两个参数。
  • 设置Spark Streaming最大数据接收率 - 若是运行Streaming应用程序的资源不是不少,数据处理能力跟不上接收数据的速率,能够为应用程序设置一个每秒最大接收记录数进行限制。对于Receiver模式的应用,设置spark.streaming.receiver.maxRate,对于Direct Kafka模式,设置spark.streaming.kafka.maxRatePerPartition限制从每一个Kafka的分区读取数据的速率。假如某个Topic有8个分区,spark.streaming.kafka.maxRatePerpartition=100,那么每一个batch最大接收记录为800。从Spark-1.5版本开始,引入了一个backpressure的机制来避免设置这个限制阈值。Spark Streaming会自动算出当前的速率限制,而且动态调整这个阈值。经过将spark.streaming.backpressure.enabledtrue开启backpressure功能。

(2)升级应用代码
  若是运行中的应用程序有更新,须要运行更新后的代码,有如下两种机制。

  • 升级后的应用程序直接启动,与现有的应用程序并行执行。在新旧应用并行运行的过程当中,会接收和处理一部分相同的数据。
  • Gracefully停掉正在运行的应用,而后启动升级后的应用程序,新的应用程序会从旧的应用程序中止处开始继续处理数据。须要注意的是,使用这种方式,须要其数据源具备缓存数据的能力,不然在新旧应用程序衔接的间歇期内,数据没法被处理。好比Kafka和Flume都具备数据缓存的能力。而且,在这种状况下,再从旧应用程序的检查点从新构造SparkStreamingContext对象再也不合适,由于检查点中的信息可能不包含更新的代码逻辑,这样会致使程序出现错误。在这种状况下,要么从新指定一个检查点,要么删除以前的检查点。

1三、监控应用程序

  在Spark Streaming应用程序运行时,Spark Web UI页面上会多出一个Streaming的选项卡,在这里面能够显示一些Streaming相关的参数,好比Receiver是否在运行,接收了多少记录,处理了多少记录等。以及Batch相关的信息,包括batch的执行时间,等待时间,完成的batch数,运行中的batch数等等。这里面有两个时间参数须要注意理解一些:

  • Processing Time - 每个batch中数据的处理时间
  • Scheduling Delay - 当前batch从进入队列到开始执行的延迟时间

      若是处理时间一直比batch时间跨度要长,或者延迟时间逐渐增加,表示系统已经没法处理当前的数据量了,这时候就须要考虑如何去下降每个batch的处理时间。如何下降batch处理时间,能够参考第四节。

      除了监控页面以外,Spark还提供了StreamingListener接口,经过这个接口能够获取到receiver以及batch的处理时间等信息。

4、性能调优

  为了使Spark Streaming应用可以更好的运行,须要进行一些调优设置,这一节会分析一些性能调优中的参数和设置规则。在性能调优方面,主要须要考虑如下两个问题:

  • 如何充分利用集群资源下降每一个Batch的处理时间
  • 如何设置合理的Batch大小,以便应用可以及时处理接收到的这些数据

一、下降每一个Batch的处理时间

  接下来的内容在Spark性能调优中已有介绍,这里再次强调一下在Streaming中须要注意的一些地方。
  
(1)接收数据进程的并行度
  经过网络(好比Kafka, Flume, socket等)接收到的数据,首先须要反序列化而后保存在Spark中。当数据接收成为系统的瓶颈时,就须要考虑如何提升系统接收数据的能力了。每个输入的DStream会在一个Worker节点上运行一个接收数据流的进程。若是建立了多个接收数据流进程,就能够生成多个输入DStream了。好比说,对于Kafka数据源,若是使用的是一个DStream接收来自两个Topic中的数据的话,就能够将这两个Topic拆开,由两个数据接收进程分开接收。当用两个receiver接收到DStream后,能够在应用中将这两个DStream再进行合并。好比下面代码中所示

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()

  须要注意一个参数spark.streaming.blockInterval。对于Receiver来讲,接收到的数据在保存到Spark内存中以前,会以block的形式汇聚到一块儿。每一个Batch中block的个数决定了程序运行时处理这些数据的task的个数。每个receiver的每个batck对应的task个数大体为(batch时间间隔 / block时间间隔)。好比说对于一个2m的batch,若是block时间间隔为200ms那么,将会有10个task。若是task的数量太少,对数据的处理就不会很高效。在batch时间固定的状况下,若是想增大task个数,那么就须要下降blockInterval参数了,这个参数默认值为200ms,官方建议的该参数下限为50ms,若是低于50ms可能会引发其余问题。
  另外一个提升数据并发处理能力的方法是显式的对接收数据从新分区,inputStream.repartition(<number of partitions>)
  
(2)数据处理的并行度
  对于reduceByKeyreduceByKeyAndWindow操做来讲,并行task个数由参数spark.default.parallelism来控制。若是想要提升数据处理的并行度,能够在调用这类方法时,指定并行参数,或者将spark.default.parallelism参数根据集群实际状况进行调整。

(3)数据序列化
  能够经过调整序列化相关的参数,来提升数据序列化性能。在Streaming应用中,有两类数据须要序列化操做。

  • 输入数据:默认状况下,Receiver接收到的数据以StorageLevel.MEMORY_AND_DIS_SER_2的形式保存在Executor的内存中。也就是说,为了下降GC开销,这些数据会被序列化成bytes形式,而且还考虑到executor失败的容错。这些数据首先会保存在内存中,当内存不足时会spill到磁盘上。使用这种方式的一个明显问题是,Spark接收到数据后,首先须要反序列化这些数据,而后再按照Spark的方式对这些数据从新序列化。
  • Streaming操做中持久化的RDD:Streaming计算产生的RDD可能也会持久化到内存中。好比窗口操做函数会将数据缓存起来以便后续屡次使用。而且Streaming应用中,这些数据的存储级别是StorageLevel.MEMORY_ONLY_SET(Spark Core的默认方式是StorageLevel.MEMORY_ONLY)。Streaming对这些数据多了一个序列化操做,这主要也是为了下降GC开销。

      在上面这两种状况中,可使用Kyro方式对数据进行序列化,同时下降CPU和内存的开销。有关序列化能够进一步参考Spark调优。对于Kyro方式的参数设置,请参考Spark Kyro参数设置
      通常状况下,若是须要缓存的数据量不大,能够直接将数据以非序列化的形式进行存储,这样不会明显的带来GC的开销。好比说,batch时间只有若干秒,而且没有使用到窗口函数操做,那么能够在持久化时显示的指定存储级别,避免持久化数据时对数据的序列化操做。
      
    (4)提升task启动性能
      若是每秒启动的task个数太多(通常指50个以上),那么对task的频繁启动也是一个不容忽视的损耗。遇到这种状况时,须要考虑一下Execution模式了。通常来讲,在Spark的Standalone模式以及coarse-grained Mesos模式下task的启动时间会比fine-grained Mesos模式要低。

二、如何正确设置Batch时间间隔

  为了使一个Spark Streaming应用在集群上稳定运行,须要保证应用在接收到数据时可以及时处理。若是处理速率不匹配,随着时间的积累,等待处理的数据将会愈来愈多,最终致使应用没法正常运行。最好的状况是batch的处理时间小于batch的间隔时间。因此,正确合理的设置Batch时间间隔是很重要的。
  

三、内存调整

  有关Spark内存的使用以及Spark应用的GC性能调节的更多细节在Spark调优中已经有了更加详细的描述。这里简单分析一些Spark Streaming应用程序会用到的参数。
  
  一个Spark Streaming应用程序须要使用集群多少内存资源,很大程度上是由该应用中的具体逻辑来决定的,即须要看应用程序中的transformations的类型。好比代码中使用到长达10分钟的窗口操做时,就须要使用到可以把10分钟的数据都保存到内存中的内存量。若是使用updateStateByKey这种操做,而数据中不一样key特别多,也会使用更多的内存。若是应用的逻辑比较简单,仅仅是接收-过滤-存储等一系列操做时,消耗的内存量会明显减小。
  
  默认状况下,receivers接收到的数据会以StorageLevel.MEMORY_AND_DISK_SER_2级别进程存储,当内存中容纳不下时会spill到磁盘上,可是这样会下降应用的处理性能,因此为了应用可以更高效的运行,最好仍是多分配一些内存以供使用。通常能够经过在少许数据的状况下,评估一下数据使用的内存量,继而计算出应用正式部署时须要分配的总内存量大小。
  
  内存调节的另外一方面是垃圾回收的设置。对一个低延迟的应用系统来讲,JVM在垃圾回收时致使应用长时间暂停运行是一个很讨厌的场景。

  下面有一些可用于调节内存使用量和GC性能的方面:

  • DStreams的持久化级别:在前面已经提到,输入数据在默认状况下会以序列化的字节形式进行持久化。与非序列化存储相比,这样会下降内存使用率和下降垃圾回收的负担。使用Kryo方式进行序列化可以进一步下降序列化后数据大小和内存的使用。想要进一步下降内存的使用量,能够在数据上再增长一个压缩功能,经过参数spark.rdd.compress来设置。
  • 清除旧数据:默认状况下,全部输入数据和DStream经过不一样的transformations持久化的数据都会自动进行清理。Spark Streaming根据transformations的不一样来决定哪些数据须要被清理掉。例如,当使用10分钟的窗口函数时,Spark Streaming会保存最少10分钟的数据。想要数据保存更长时间,能够设置streamingContext.remenber参数。
  • 使用CMS垃圾回收算法:特别建议使用CMS垃圾回收机制来下降GC压力。driver上经过设置spark-submit命令的--driver-java-options参数来指定,executor上经过设置spark.executor.extraJavaOptions参数来指定。
  • 其余建议:进一步下降GC负担,可使用如下一些方法。
    • 使用Tachyon提供的OFF_HEAP存储级别来持久化RDDs,能够参考RDD Persistence
    • 下降heap大小,使用更多executors。这样能够下降每一个JVM堆的GC压力。

5、容错性

  本节主要讨论Spark Streaming应用程序失败后的处理办法。

一、背景

二、定义

三、基本概念

四、数据接收方式

(1)Files输入

(2)基于Receiverd 数据源

(3)Kafka Direct输入方式

五、输出操做

6、Spark Streaming的升级

7、继续

相关文章
相关标签/搜索