SparkStreaming

SparkStreaming(1) ~ SparkStreaming编程指南

之因此写这部份内容的缘由是, 不管是网络上能够直接找到的资料, 仍是出版的书籍种种, 版本大都在1.6~2.0不等, 且资源零零散散, 须要处处百度, 搜罗资源.html

但根据我的开发了一段时间的感受来看, 会遇到的绝大多数问题, 均可以在官方文档中找到答案.java

所以也能够理解为这是官方文档的部分翻译.node

我的英文水平有限, 若有错漏欢迎指正.mysql

就目前来看, 主要分为这样几个板块.git

  1. Spark Streaming Programming Guide 也即SparkStreaming编程指南.github

  2. Submitting Applications Spark部署发布相关算法

  3. Tuning Spark Spark调优sql

  4. Spark Configuration Spark可用配置, 可选参数.数据库

目前已经有了Spark Streaming的中文翻译. 参考:apache

Spark Streaming编程指南

Spark编程指南

内容自己会比较多, 所以会拆开来, 分多篇介绍.

在这里就不从word count的简单示例开始了, 而是直接从基础概念开始.

Maven依赖

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.4.3</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>2.4.3</version>
    <scope>provided</scope>
</dependency>

而一样当前版本对应的中间件:

Source  Artifact
Kafka   spark-streaming-kafka-0-10_2.12
Flume   spark-streaming-flume_2.12
Kinesis spark-streaming-kinesis-asl_2.12 [Amazon Software License]

而更完整的, 更新的中间件 Maven 仓库路径为:

Maven repository

若是以为欠缺什么, 不妨找找试试.

初始化Streaming Context

为了初始化一个 Spark Streaming 程序,一个 StreamingContext 对象必需要被建立出来,它是全部的 Spark Streaming 功能的主入口点.

有两种建立方式:

import org.apache.spark.*;
import org.apache.spark.streaming.api.java.*;

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));

其中:

appName 是在Spark UI上展现所使用的名称.

master 是一个 Spark, Mesos or YARN cluster URL, 不了解也不要紧, 这部分会在Spark-submit时介绍到.

master 这个指的是Spark项目运行所在的集群. 若是想在本地启动SparkStreaming项目: 可使用一个特殊的 “local[*]” , 启动Spark的本地模式, *表示会自动检测系统的内核数量.

然而在集群环境下, 通常不采用硬编码的方式使用spark, 即 setMaster. 咱们有更好的方式 经过 spark-submit 在提交时指定master参数便可.

须要注意到的是, 这句代码会在内部建立一个 SparkContext, 能够经过 ssc.sparkContext 访问使用.

batch interval 也即 new Duration(1000) 在这里指的是毫秒值, 还能够采用Durations来建立.

Durations.seconds(5)

这个时间, 必须根据您的应用程序和可用的集群资源的等待时间要求进行设置.

另外一种建立 SparkStreamingContext的方式为:

import org.apache.spark.streaming.api.java.*;

JavaSparkContext sc = ...   //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

在已经有了Context以后, 咱们须要作的是:

  1. 建立Input DStreams, 如kafka就有相应的方法能够建立DStream

  2. 对输入流作 转换 处理, 也即咱们的功能部分.

  3. 开始接收输入而且使用 streamingContext.start() 来处理数据.

  4. 使用 streamingContext.awaitTermination() 等待处理被终止(手动或者因为任何错误).

  5. 使用 streamingContext.stop() 来手动的中止处理.

同时, 有几点须要注意的地方:

  • 一旦一个 context 已经启动,将不会有新的数据流的计算能够被建立或者添加到它.

  • 一旦一个 context 已经中止,它不会被从新启动.

  • 同一时间内在 JVM 中只有一个 StreamingContext 能够被激活. 也即假设在使用SparkStreaming的同时, 须要依赖 SparkContext 或 SparkSQL等作一些操做, 此时不能从新建立 SparkContext 或是 SparkSQL(由于SparkSQL依然会建立Context.) 须要直接使用ssc.sparkContext.

  • 调用 ssc.stop() 会在中止 SparkStreamingContext的同时 中止 SparkContext. 若是须要仅中止 StreamingContext,须要使用 ssc.stop(false);

  • 一个 SparkContext 就能够被重用以建立多个 StreamingContexts,只要前一个 StreamingContext 在下一个StreamingContext 被建立以前中止(不中止 SparkContext, 即便用 ssc.stop(false)).

在这里额外添加一条说明, ssc.stop 能够接收第二个参数, 是指定是否执行完当前批次的剩余数据.

Discretized Streams

Discretized Stream or DStream 是 Spark Streaming 提供的基本抽象.

有且仅有两种方式建立一个DStream, 第一种是经过 Spark的API去建立流, 第二种是从一个流转换成另外一个流.

在内部,一个 DStream 被表示为一系列连续的 RDDs,它是 Spark 中一个不可改变的抽象,distributed dataset.在一个 DStream 中的每一个 RDD 包含来自必定的时间间隔的数据,以下图所示.

应用于 DStream 的任何操做转化为对于底层的 RDDs 的操做.例如,在 先前的示例,转换一个行(lines)流成为单词(words)中,flatMap 操做被应用于在行离散流(lines DStream)中的每一个 RDD 来生成单词离散流(words DStream)的 RDDs.以下所示.

所以对于RDD支持的操做, DStream也基本都支持.

Input DStreams 和 Receivers(接收器)

输入 DStreams 是表明输入数据是从流的源数据(streaming sources)接收到的流的 DStream.每个 input DStream(除了 file stream 以外)与一个 Receiver 对象关联,它从 source(数据源)中获取数据,而且存储它到 Spark 的内存中用于处理.

receiver的java代码以下:

class MyReceiver extends Receiver<String> {

    public MyReceiver(StorageLevel storageLevel) {
        //StorageLevel表示存储级别
        super(storageLevel);
    }

    public void onStart() {
        //1. 启动线程, 打开Socket链接, 准备开始接收数据
        //2. 启动一个非阻塞线程去接收数据.
        //3. 调用Store方法将数据存储到 Spark的内存中, store方法有多种实现,支持将多种多样的数据进行存储.
        //4. 在发生错误或异常时根据自身的处理策略调用stop, restart, reportError 方法.
    }

    public void onStop() {
        //清理各类线程,未关闭的连接等等
    }
}

Spark Streaming 提供了两种内置的 streaming source(流的数据源).

  • Basic sources(基础的数据源):在 StreamingContext API 中直接可使用的数据源.例如:file systems 和 socket connections.

  • Advanced sources(高级的数据源):像 Kafka,Flume,Kinesis,等等这样的数据源.能够经过对应的maven repository 找到依赖.

须要注意到的是, 若是你想要在你的流处理程序中并行的接收多个数据流,你能够建立多个 input DStreams.这将建立同时接收多个数据流的多个 receivers(接收器),然而,一个 Spark 的 worker/executor 是一个长期运行的任务(task),所以它将占用分配给 Spark Streaming 的应用程序的全部核中的一个核(core).

所以,须要记住,一个 Spark Streaming 应用须要分配足够的核(core)(或线程(threads),若是本地运行的话)来处理所接收的数据,以及来运行接收器(receiver(s)).

所以相应的就须要在建立master的时候 不要使用local[1] 或 local 仅分配一个线程, 这将会使得receiver获得一个线程 而对应的程序则没有线程能够处理.

在集群模式下, 则须要分配适当的核心数.

而在使用中, 个人数据源是来自于kafka, 使用的是 kafkaUtils.createDirectStream. 而使用的 core数只有1, 或采用 local[1] 也可以正常运行, 这是否是说明上面的说法是错误的呢?

并非, 由KafkaUtils.createDirectStream 建立的是DStream, 而并不是单纯的使用 receiver的方式实现.

若是采用了自定义的 receiver, 那么此时经过 javaSparkContext.receiveData() 的方式建立流, 就至少须要两个线程, 或两个核心才可以正常运行.

Basic Sources

  • ssc.socketTextStream() 经过Socket来读取数据.

  • 经过文件读取数据, 须要注意的是,文件系统必须是与 HDFS API 兼容的文件系统中(即,HDFS,S3,NFS 等),一个 DStream 能够像下面这样建立:

    声明: 对于下面所描述的这种方式,我我的并无通过验证, 因为我的使用的数据源主要是kafka, mysql, es. 在尝试过程当中fileStream并不能直接使用, 所以有如下猜测.

    streamingContext.textFileStream(dataDirectory);

    而windows的文件系统是 NTFS,这就要求咱们有对应的文件环境才行.

    参考:Hadoop入门系列(一)Window环境下搭建hadoop和hdfs的基本操做

    Spark Streaming 将监控dataDirectory 目录 以及 该目录中任何新建的文件 (写在嵌套目录中的文件是不支持的)

    须要注意的几个地方有:

    • 能够直接监控路径, 也能够经过目录的通配符来监控,hdfs://namenode:8040/logs/ 或 hdfs://namenode:8040/logs/2017/* 都是能够的.

    • 文件必须具备相同的数据格式.

    • 在读取文件的时候,参考的时间是 文件的更新时间,而非建立时间.

    • 一旦被加载,即便文件被更新了 在当前窗口内 也不会被从新读取, 所以即便文件不停被追加新的内容, 可是并不会读入.

    • 文件越多,检索文件变动状况所须要的时间也就越多,即便大多数文件都没有变动.

    • 若是使用的是通配符的方式 去识别文件目录,如: hdfs://namenode:8040/logs/2016-*, 在这种状况下, 经过重命名文件夹 也能够将对应文件夹下的文件 加入被监控的文件列表中, 固然须要修改时间在对应的window内.

    • 经过调用 FileSystem.setTimes() (hadoop api) 能够更改文件的更新时间.

    如下部分是对文件流的一个说明, 如何将文件转换成DStream的概念?

    参考: org.apache.spark.streaming.dstream.FileInputDStream scala api

    *                      |<----- remember window ----->|
     * ignore threshold --->|                             |<--- current batch time
     *                      |____.____.____.____.____.____|
     *                      |    |    |    |    |    |    |
     * ---------------------|----|----|----|----|----|----|-----------------------> Time
     *                      |____|____|____|____|____|____|
     *                             remembered batches

    依然是按照时间批次来将数据转换成RDDs,整合成 DStream.

    在每一个时间批次中,检测 被监控的文件列表 若是修改时间在 current batch范围内的, 在归入列表, 转换成DStream, 在excutor的执行期间 新加入的文件,放入下一批次进行处理.

    而文件的修改时间 在 ignore threshold 以后的,则会被忽略掉.

    要求:

    • 运行Spark的系统时间 要与对应的 文件系统时间保持一致.

    • 文件必须在相同的文件系统下经过 atomically(原子的)moving(移动) 或 renaming(重命名) 到数据目录.

    而duration的定义则是经过:

    spark.streaming.fileStream.minRememberDuration

    默认是一分钟, 即读取修改时间在一分钟之内的文件.

    更细节的能够自行解读代码实现.

  • Queue of RDDs as a Stream(RDDs 队列做为一个流)

    为了使用测试数据测试 Spark Streaming 应用程序,还可使用 streamingContext.queueStream(queueOfRDDs) 建立一个基于 RDDs 队列的 DStream,每一个进入队列的 RDD 都将被视为 DStream 中的一个批次数据,而且就像一个流进行处理.

Advanced Sources(高级数据源)

Kafka: Spark Streaming 2.4.3 要求 Kafka versions 0.8.2.1 以上版本.

官方参考连接: Kafka Integration Guide

我的参考连接: SparkStreaming-Kafka集成

Custom Sources(自定义数据源)

DStreams 可使用经过自定义的 receiver(接收器)接收到的数据来建立.

Spark Streaming Custom Receivers

receiver的大体建立过程在上面已经提到过了.

案例代码实现:

JavaCustomReceiver.java.

经过以下方式使用 自定义的 Receiver

// Assuming ssc is the JavaStreamingContext
JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));
JavaDStream<String> words = customReceiverStream.flatMap(s -> ...);

Receiver Reliability(接收器的可靠性)

有两种receiver, 可靠性, 不可靠性, 区别就在于对于数据的失败处理上, 可靠receiver并不会丢失数据,而不可靠receiver则不对 数据安全性 提供任何保障.

从数据源上来讲, 自己就存在两种数据源, 如 kafka flume所提供的可靠性数据源.可以对 下发处理数据的 响应 作出相应的处理. 而不可靠数据源, 只负责下发数据.

若是想要实现一个 可靠的 receiver, 须要注意的是, 即便采用的是可靠数据源, 也不必定就是可靠的receiver.

若是你想实现一个可靠的数据接收器,必须用store方法,这是一个阻塞的方法,在它存入spark内部时才会返回值.若是接受器用的Storage Level 是复制(也可使用默认),那么在复制完后才会获得返回值.所以,在肯定完数据可靠存储以后,再适当的发送确认信息.这就避免了若是在存储中途失败,也不会致使数据丢失.由于缓冲区的数据没有被确认,那么数据源将会从新发送.

若是是不可靠接收器,那么无须以上逻辑,他只是简单地接收数据并存储到Spark内存中而已,但并不是说不可靠的 接收器就毫无优势:

  • 系统会自动把数据分割为 大小合适的 块
  • 若是限制速率已经被指定, 那么系统会自动控制接收速率
  • 因为上面提到的优势, 所以实现起来更为简单.

而与之相对的, 可靠的接收器就须要本身实现数据分块, 以及速率控制, 而实现方式主要取决于数据源.

DStreams 上的 Transformations(转换)

DStreams 支持不少在 RDD 中可用的 transformation 算子, 至于transformation 和 action 算子的区别, 能够自行百度了解.

参考连接: spark算子

而更详细的须要查看官方API

参考连接: http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaRDD.html

其中返回值为各类RDD的通常都是 transformation算子, 不然为 action算子.

在茫茫多的 transformation中 选择几个比较特别的来详细说明下:

UpdateStateByKey

updateStateByKey 操做容许你维护任意状态,同时不断更新新信息.你须要经过两步来使用它:

  1. 定义 state - state 能够是任何的数据类型.
  2. 定义 state update function(状态更新函数)- 使用函数指定如何使用先前状态来更新状态,并从输入流中指定新值.

在每一个 batch 中,Spark 会使用状态更新函数为全部已有的 key 更新状态,无论在 batch 中是否含有新的数据.若是这个更新函数返回一个 none,这个 key-value pair 也会被消除.

Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
(values, state) -> {
    Integer newSum = ...  // add the new values with the previous running count to get the new count
    return Optional.of(newSum);
};

JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);

请注意,使用 updateStateByKey 须要配置的 checkpoint(检查点)的目录.

可是, updateStateByKey 有不可避免的缺点.

参考: [spark streaming] 状态管理 updateStateByKey&mapWithState

总结来讲:

updateStateByKey底层是将preSateRDD和parentRDD进行co-group,而后对全部数据都将通过自定义的mapFun函数进行一次计算,即便当前batch只有一条数据也会进行这么复杂的计算,大大的下降了性能,而且计算时间会随着维护的状态的增长而增长.

mapWithstate底层是建立了一个MapWithStateRDD,存的数据是MapWithStateRDDRecord对象,一个Partition对应一个MapWithStateRDDRecord对象,该对象记录了对应Partition全部的状态,每次只会对当前batch有的数据进行跟新,而不会像updateStateByKey同样对全部数据计算.

所以才会有 mapWithstate 的性能远远高于 updateStateByKey

Transform 算子

这是Spark中 自由度最高的一个算子, Spark官方API提供的算子毕竟是有限的, 可能确实不可以知足你的要求, 所以才会有了这个 transform算子.

其核心做用是:

容许在 DStream 运行任何 RDD-to-RDD 函数.它可以被用来应用任何没在 DStream API 中提供的 RDD 操做.例如,链接数据流中的每一个批(batch)和另一个数据集的功能并无在 DStream API 中提供,然而你能够简单的利用 transform 方法作到.

import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);

JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(rdd -> {
    rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
    ...
});

transform方法在每一个批次都会进行调用, 所以能够根据不一样时间进行相应的处理.

但须要注意的一点是:

虽然transform是 transformation算子, 可是 并不意味着其中的方法必然是在job分配完, 真实提交以后才执行.

缘由也正是在于这个算子的灵活性至关高. 能够在其中嵌入任何RDD操做.

参考连接: Spark Streaming 误用.transform(func)函数致使的问题解析

致使其问题的根本缘由就在于 在 transform中执行的 action操做, 是会在 生成job的时候执行的.

通过测试,还会有这样一点问题:

首先transform 确确实实是会在 job生成的时候执行相关代码,若是有action的话, 而且使用的线程也是 job generator线程。 其次, 在transform以前的算子 执行次序是会在transform以前的, 如在transform 以前有过filter, 那么filter必定是在transform以前执行的。

在这一点上, 我更倾向于 transform中的 action 提早引起了 transform算子以前的 算子 执行运算。而并无等到 后续的 真正的 dstream的action触发时再执行。

然而这并不意味着咱们能够省掉后续的action算子。 若是没有后续的 dstream的action算子, 生成job的举动也不会有, 所以更不会触发transform中的action。

Window

Spark Streaming 也支持 windowed computations(窗口计算),它容许你在数据的一个滑动窗口上应用 transformation(转换).下图说明了这个滑动窗口.

如上图显示,窗口在源 DStream 上 slides(滑动),合并和操做落入窗内的源 RDDs,产生窗口化的 DStream 的 RDDs.在这个具体的例子中,程序在三个时间单元的数据上进行窗口操做,而且每两个时间单元滑动一次.这说明,任何一个窗口操做都须要指定两个参数.

window length(窗口长度) - 窗口的持续时间.

sliding interval(滑动间隔) - 执行窗口操做的间隔.

这两个参数必须是 source DStream 的 batch interval(批间隔)的倍数.

// Reduce last 30 seconds of data, every 10 seconds
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10));

经常使用的 window操做以下:

Transformation(转换) Meaning(含义)
window(windowLength, slideInterval) 返回一个新的 DStream,它是基于 source DStream 的窗口 batch 进行计算的.
countByWindow(windowLength, slideInterval) 返回 stream(流)中滑动窗口元素的数
reduceByWindow(func, windowLength, slideInterval) 返回一个新的单元素 stream(流),它经过在一个滑动间隔的 stream 中使用 func 来聚合以建立.该函数应该是 associative(关联的)且 commutative(可交换的),以便它能够并行计算
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 在一个 (K, V) pairs 的 DStream 上调用时,返回一个新的 (K, V) pairs 的 Stream,其中的每一个 key 的 values 是在滑动窗口上的 batch 使用给定的函数 func 来聚合产生的.Note(注意): 默认状况下,该操做使用 Spark 的默认并行任务数量(local model 是 2,在 cluster mode 中的数量经过 spark.default.parallelism 来肯定)来作 grouping.您能够经过一个可选的 numTasks 参数来设置一个不一样的 tasks(任务)数量.
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 上述 reduceByKeyAndWindow() 的更有效的一个版本,其中使用前一窗口的 reduce 值逐渐计算每一个窗口的 reduce值.这是经过减小进入滑动窗口的新数据,以及 “inverse reducing(逆减)” 离开窗口的旧数据来完成的.一个例子是当窗口滑动时”添加” 和 “减” keys 的数量.然而,它仅适用于 “invertible reduce functions(可逆减小函数)”,即具备相应 “inverse reduce(反向减小)” 函数的 reduce 函数(做为参数 invFunc </ i>).像在 reduceByKeyAndWindow 中的那样,reduce 任务的数量能够经过可选参数进行配置.请注意,针对该操做的使用必须启用 checkpointing.
countByValueAndWindow(windowLength, slideInterval, [numTasks]) 在一个 (K, V) pairs 的 DStream 上调用时,返回一个新的 (K, Long) pairs 的 DStream,其中每一个 key 的 value 是它在一个滑动窗口以内的频次.像 code>reduceByKeyAndWindow 中的那样,reduce 任务的数量能够经过可选参数进行配置.

若是能够, 而且通常均可以调用 reduceByKeyAndWindow 的第二个版本, 描述了那么多,其实旨在说明, 当采用滑动窗口的时候, 有两种对数据的处理方式, 其一是 每次都去统计最新的 窗口中的 全部数据. 其二则是, 在原有数据基础上作出必定更新,这就要求 对已经离开窗口的数据作 ‘减量’ 操做, 对新进入窗口的数据作 ‘增量’ 操做.也即须要提供的 参数:

func 对数据作reduce操做, 完成统计更新

invFunc 对数据作‘减量’ 操做 在原有基础上进行更新.

Join

join 与 Sql中的 Join极为相似, 在 SQL中 join操做,要以 join on “on”后面的参数为准, 而在 Spark中, 则要求 进行join 的两个 RDD 或 DStream 都必须是 PairRDD 或是 PairDStream.

这样在进行join操做的时候, SQL中的 “on” 在Spark中就变成了, Tuple2中的第一个参数.

须要注意的是 join操做最终合并成一个流, 所以也会将多个分区的数据进行合并, 是一次窄依赖变换, 所以最终会造成新的分区.同时通常能够自行制定分区数, 若是不指定, 则使用Spark默认的分区数.

至于Spark默认的分区数: 参考连接:Spark RDD的默认分区数:(spark 2.1.0)

其核心就是:

sc.defaultParallelism = spark.default.parallelism
sc.defaultMinPartitions = min(spark.default.parallelism,2)

而RDD的分区数,则是 若是在从新分区时指定了分区数, 则采用分区数, 不然,就使用默认值.

另外, 分区的默认方式/规则 是 HashPartition

  1. Join

    Join:Join相似于SQL的inner join操做,返回结果是前面和后面集合中配对成功的,过滤掉关联不上的.

  2. leftOuterJoin

    leftOuterJoin相似于SQL中的左外关联left outer join,返回结果之前面的RDD为主,关联不上的记录为空.从返回类型上就可略知一二.

    JavaPairDStream[K, (V, Optional[W])]

    其第二个参数是 Optional, 能够接收空值.

  3. rightOuterJoin

    rightOuterJoin相似于SQL中的有外关联right outer join,返回结果以参数也就是右边的RDD为主,关联不上的记录为空.

    JavaPairDStream[K, (Optional[V], W)]
  4. fullOuterJoin

    fullOuterJoin相比前几种来讲并不常见,是 左外 右外链接的 结合. 最终的结果 是两个 流的并集. 返回的数据类型是:

    JavaPairDStream[K, (Optional[V], Optional[W])]

对于Join还有一种不错的用法:

JavaPairRDD<String, String> dataset = ...
JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
JavaPairDStream<String, String> joinedStream = windowedStream.transform(rdd -> rdd.join(dataset));

能够将RDD 与 流数据进行join操做, 进而完成流数据与 固定数据集的合并.

实际上,您也能够动态更改要加入的 dataset.提供给 transform 的函数是每一个 batch interval(批次间隔)进行评估,所以能够将 dataset 引用指向当前的 dataset.

偶然间看到的案例:

Spark Streaming 流计算优化记录(2)-不一样时间片数据流的Join

主要解决的就是 来自 HDFS的数据 与 流数据合并.

但在这个案例中是否是有一种更合理的处理方式? 即每隔固定时间去 更新dataset, 在transform中 将流的RDD 与 HDFS的数据合并.

若是是不一样的 batch interval 之间的数据能够合并吗?

目前来看并无看到这种可能性, 对于 SparkContext, 同时只可以启动一个 SparkStreamingContext, 而在SparkStreamingContext启动时就须要指定 batch interval, 所以好像不太可能出现 多个 batchInterval.

倒不如说是 JavaPairRDD 与 JavaPairDStream 之间的合并.

DStreams 上的输出操做

输出操做 解释
print() 在运行流应用程序的 driver 节点上的DStream中打印每批数据的前十个元素.这对于开发和调试颇有用.
saveAsTextFiles(prefix, [suffix]) 将此 DStream 的内容另存为文本文件.每一个批处理间隔的文件名是根据 前缀 和 后缀:"prefix-TIMEIN_MS[.suffix]"_ 生成的.
saveAsObjectFiles(prefix, [suffix]) 将此 DStream 的内容另存为序列化 Java 对象的 SequenceFiles.每一个批处理间隔的文件名是根据 前缀 和 后缀:"prefix-TIMEIN_MS[.suffix]"_ 生成的.
saveAsHadoopFiles(prefix, [suffix]) 将此 DStream 的内容另存为 Hadoop 文件.每一个批处理间隔的文件名是根据 前缀 和 后缀:"prefix-TIMEIN_MS[.suffix]"_ 生成的.
foreachRDD(func) 对从流中生成的每一个 RDD 应用函数 func 的最通用的输出运算符.此功能应将每一个 RDD 中的数据推送到外部系统,例如将 RDD 保存到文件,或将其经过网络写入数据库.请注意,函数 func 在运行流应用程序的 driver 进程中执行,一般会在其中具备 RDD 动做,这将强制流式传输 RDD 的计算.

须要注意的是: foreachRDD 自己是运行在 Driver节点的, 而一般要对 RDD作相应的 action 操做. 而这部分操做则是在 各自的 work 上执行的.

foreachRDD 设计模式的使用

错误操做:

dstream.foreachRDD(rdd -> {
    Connection connection = createNewConnection(); // 在driver节点执行
    rdd.foreach(record -> {
        connection.send(record); // 在 worker节点执行.
    });
});

而Connection每每又没法被序列化, 所以在 worker节点上依然拿不到链接.

dstream.foreachRDD(rdd -> {
    rdd.foreach(record -> {
        Connection connection = createNewConnection();
        connection.send(record);
        connection.close();
    });
});

所以每每须要采用以下这种方式:

dstream.foreachRDD(rdd -> {
    rdd.foreachPartition(partitionOfRecords -> {
        // ConnectionPool is a static, lazily initialized pool of connections
        Connection connection = ConnectionPool.getConnection();
        while (partitionOfRecords.hasNext()) {
        connection.send(partitionOfRecords.next());
        }
        ConnectionPool.returnConnection(connection); // return to the pool for future reuse
    });
});

若是仅仅是想单纯的处理数据, 但并不在这一就须要进行foreachRDD 用以完成流的执行计算呢?

伪代码以下:

dstreamTemp = dstream.foreachRDD(rdd -> {
    rdd.foreachPartition(partitionOfRecords -> {
        Connection connection = ConnectionPool.getConnection();
        while (partitionOfRecords.hasNext()) {
            //进行查询数据, 填充 补全, 过滤 当前stream中的数据, 用如下一步继续处理.
        }
        ConnectionPool.returnConnection(connection); // return to the pool for future reuse
    });
});

dstreamTemp.nextAction;

但foreachRDD 是不会返回流的, 所以能够采用

dstreamTemp = dstream.mapPairtition();
dstreamTemp = dstream.mapToPairPartition();

另外关于 foreachRDD 须要注意的是:

DStreams 经过输出操做进行延迟执行,就像 RDD 由 RDD 操做懒惰地执行.具体来讲,DStream 输出操做中的 RDD 动做强制处理接收到的数据.所以,若是您的应用程序没有任何输出操做,或者具备 dstream.foreachRDD() 等输出操做,而在其中没有任何 RDD 操做,则不会执行任何操做.系统将简单地接收数据并将其丢弃.

默认状况下,输出操做是 one-at-a-time 执行的, 且按照它们在应用程序中定义的顺序执行.

DataFrame and SQL

Spark SQL在之后的篇章在详细介绍,在流数据上使用 DataFrame 和 SQL也是一件很简单的事情.

首先须要经过 SparkStreaming 获取对应的 SparkContext, 然后经过SparkContext建立 SparkSession. 而且必须建立, 这样就能够在 driver出现故障的时候, 从新启动. 咱们能够将 Spark RDD 转换成 DataFrame,注册为临时表,然后进行SQL查询. 案例代码以下:

/** Java Bean class for converting RDD to DataFrame */
public class JavaRow implements java.io.Serializable {
    private String word;

    public String getWord() {
        return word;
    }

    public void setWord(String word) {
        this.word = word;
    }
}

...

/** DataFrame operations inside your streaming program */

JavaDStream<String> words = ... 

words.foreachRDD((rdd, time) -> {
    // Get the singleton instance of SparkSession
    SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate();

    // Convert RDD[String] to RDD[case class] to DataFrame
    JavaRDD<JavaRow> rowRDD = rdd.map(word -> {
        JavaRow record = new JavaRow();
        record.setWord(word);
        return record;
    });
    DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);

    //能够理解为 定义表名
    wordsDataFrame.createOrReplaceTempView("words");

    // Do word count on table using SQL and print it
    DataFrame wordCountsDataFrame =
        spark.sql("select word, count(*) as total from words group by word");
    wordCountsDataFrame.show();
});

代码全链接: source code

并不只仅是这些, SparkSQL的另外一大优势是, 可以跨线程 访问 其余Streaming Data定义的 DataFrame.

须要主动将 SparkStreaming 设置为记住足量数据. 由于 对于 当前线程的 StreamingData 并不知道此时还有来自其余线程的SQL查询, 会自动删除清理 已经不须要的数据.

例如,若是要查询最后一个批次,可是您的查询可能须要5分钟才能运行,则能够调用 streamingContext.remember(Minutes(5))

缓存 / 持久性

与 RDD 相似,DStreams 还容许开发人员将流的数据保留在内存中.也就是说,在 DStream 上使用 persist() 方法会自动将该 DStream 的每一个 RDD 保留在内存中.若是 DStream 中的数据将被屡次计算(例如,相同数据上的多个操做),这将很是有用.对于基于窗口的操做,如 reduceByWindow 和 reduceByKeyAndWindow 以及基于状态的操做,如 updateStateByKey,这是隐含的.所以,基于窗口的操做生成的 DStream 会自动保存在内存中,而不须要开发人员调用 persist().

对于经过网络接收数据(例如:Kafka,Flume,sockets 等)的输入流,默认持久性级别被设置为将数据复制到两个节点进行容错.

请注意,与 RDD 不一样,DStreams 的默认持久性级别将数据序列化在内存中.

spark的计算是lazy的,只有在执行action时才真正去计算每一个RDD的数据.要使RDD缓存,必须在执行某个action以前定义RDD.persist().

而在使用完毕以后, 最好也可以主动调用 unpersist() 释放内存, 固然, 并不意味着, 若是不主动调用, 就不会释放内存, 它会遵循 LRU原则, 进行内存的释放, 无效cache的删除.

参考:Spark cache的用法及其误区分析

在参考文档中,有一点我有不一样的意见:

cache以后必定不能当即有其它算子,不能直接去接算子.由于在实际工做的时候,cache后有算子的话,它每次都会从新触发这个计算过程.

测试代码以下:

JavaSparkContext jsc = createJavaSparkContext();
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 16; i++) {
    list.add(i);
}

JavaRDD<Integer> rdd = jsc.parallelize(list);
JavaRDD<Integer> rdd1 = rdd.filter(item -> {
    System.out.println("rdd1:" + item);
    return item > 3;
}).cache();
JavaRDD<Integer> rdd2 = rdd1.filter(item -> {
    System.out.println("rdd2:" + item);
    return item > 6;
}).cache();
rdd2.count();

输出结果并无将 rdd1:1 重复输出两遍,也即意味着 cache以后有算子, 只会将cache以后的算子进行计算, 而已经计算过的并不会致使重复计算. 所以咱们能够放心使用.

CheckPoint机制

鉴于 SparkStreaming 必须永久运行, 所以对于 程序无关的错误, 如系统错误, JVM崩溃等问题.具备可恢复的功能.所以 Spark必须保存部分信息, 到容错存储系统. check point有两种类型:

  • Metadata checkpointing - 将定义 streaming 计算的信息保存到容错存储(如 HDFS)中,元数据包括:

    • Configuration - 用于建立流应用程序的配置.
    • DStream operations - 定义 streaming 应用程序的 DStream 操做集.
    • Incomplete batches - 已经进入队列,可是未完成的 Job的 批次.
  • Data checkpointing - 将生成的 RDD 保存到可靠的存储.这在 基于 批次之间数据具备关联性 上的数据处理是有必要的, 如 reduceStateByKey,在这种转换中,生成的 RDD 依赖于先前批次的 RDD,这致使依赖链的长度随时间而增长.为了不恢复时间的这种无限增长(与依赖关系链成比例),有状态转换的中间 RDD 会按期 checkpoint 到可靠的存储(例如 HDFS)以切断依赖关系链.

    同俗来讲, 就是为了防止 在恢复数据时, 须要从第一个RDD开始从新计算状态,致使计算时间过长, 且保存的依赖链太长, 会在中间进行截断, 保存中间点 RDD的状态, 这样在恢复时就无需从最开始进行恢复处理.

使用CheckPoint的时机

checkPoint 并不是老是必要的,当咱们依赖的是可靠数据源,(又或者是丢失部分数据 也无所谓) 而且有本身的方式可以 查找到上次执行的 offset, 则彻底无需checkpoint, 此时只须要自行再度拉取数据, 处理数据便可.

  1. 若是在应用程序中使用 updateStateByKey或 reduceByKeyAndWindow(具备反向功能),则必须提供 checkpoint 目录以容许按期的 RDD checkpoint.

  2. 从运行应用程序的 driver 的故障中恢复 - 元数据 checkpoint 用于使用进度信息进行恢复.

如何使用checkPoint

首先你须要有一个 容错的 可靠的 文件系统, 好比 HDFS,S3 去保存你的checkpoint信息.

然后调用 streamingContext.checkpoint(checkpointDirectory), 经过这种方式 就可使用上面提到的 updateStateByKey 或 reduceByKeyAndWindow(具备反向功能) 这类的状态计算.

另外,若是要使应用程序从 driver 故障中恢复,您应该重写 streaming:

  • 当程序第一次启动时,它将建立一个新的 StreamingContext,设置全部流,而后调用 start().
  • 当程序在失败后从新启动时,它将从 checkpoint 目录中的 checkpoint 数据从新建立一个 StreamingContext.

    // Create a factory object that can create and setup a new JavaStreamingContext
      JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
          @Override public JavaStreamingContext create() {
              JavaStreamingContext jssc = new JavaStreamingContext(...);  // new context
              JavaDStream<String> lines = jssc.socketTextStream(...);     // create DStreams
              ...
              jssc.checkpoint(checkpointDirectory);                       // set checkpoint directory
              return jssc;
          }
      };
    
      //  这行代码即可以知足上述两种行为.
      JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
    
      // Do additional setup on context that needs to be done,
      // irrespective of whether it is being started or restarted
      context. ...
    
      // Start the context
      context.start();
      context.awaitTermination();

在上面的方法中, 若是是 第一次启动, 会调用 contextFactory 建立 Streaming 环境, 同时须要在 factory中建立对应的流, 以及定义流的处理过程. 若是是第二次启动, 则会经过 checkpoint 恢复程序当时的状态.

这是故障恢复的一部分, 另外就是咱们须要当 driver 挂掉以后, 让其可以自动重启, 这部分会在接下来的部署中讲到.

Accumulators,Broadcast 变量,和 Checkpoint

须要注意的是 Accumulators,Broadcast 没法经过 checkpoint进行恢复, 其惟一处理方式是, 在程序执行时 建立延迟 实例化 的 对象.

class JavaWordBlacklist {

    private static volatile Broadcast<List<String>> instance = null;

    public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
        if (instance == null) {
        synchronized (JavaWordBlacklist.class) {
            if (instance == null) {
            List<String> wordBlacklist = Arrays.asList("a", "b", "c");
            instance = jsc.broadcast(wordBlacklist);
            }
        }
        }
        return instance;
    }
}

class JavaDroppedWordsCounter {

    private static volatile LongAccumulator instance = null;

    public static LongAccumulator getInstance(JavaSparkContext jsc) {
        if (instance == null) {
        synchronized (JavaDroppedWordsCounter.class) {
            if (instance == null) {
            instance = jsc.sc().longAccumulator("WordsInBlacklistCounter");
            }
        }
        }
        return instance;
    }
}

wordCounts.foreachRDD((rdd, time) -> {
    // Get or register the blacklist Broadcast
    Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
    // Get or register the droppedWordsCounter Accumulator
    LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
    // Use blacklist to drop words and use droppedWordsCounter to count them
    String counts = rdd.filter(wordCount -> {
        if (blacklist.value().contains(wordCount._1())) {
        droppedWordsCounter.add(wordCount._2());
        return false;
        } else {
        return true;
        }
    }).collect().toString();
    String output = "Counts at time " + time + " " + counts;
}

Spark 部署

我的写的Spark集群部署相关, 详细描述了 standalone模式

Spark集群-Standalone 模式

要运行Spark 应用程序, 须要如下功能:

  1. 集群管理器, 有这样几种

    type mean
    Standalone 一种简单的 Spark 内置 集群管理器
    Apache Mesos 经常使用的集群管理器之一, 能够运行 Hadoop MapReduce 和 service applications.
    Hadoop YARN Hadoop 2 的资源管理器.
    Kubernetes 简称K8S Kubernetes是Google开源的一个容器编排引擎,它支持自动化部署、大规模可伸缩、应用容器化管理
  2. 打包后的应用程序Jar, 要求是一个可以在 Spark环境下直接运行的Jar包,所以须要将引用的各类各样的其余jar包, 如kafkaUtils, ZK, Redis 等都打包到一个jar包中, 同时须要在打包时排除 没必要要的 依赖jar, 如 spark core, scala lib 等,Maven的插件 Shade就能够很好地实现这一点.

  3. 为 executor 配置足够的内存 - 因为接收到的数据必须存储在内存中,因此 executor 必须配置足够的内存来保存接收到的数据.请注意,若是您正在进行10分钟的窗口操做,系统必须至少保留最近10分钟的内存中的数据.所以,应用程序的内存要求取决于其中使用的操做.

  4. 配置 checkpoint - 若是 streaming 应用程序须要它,则 Hadoop API 容错存储(例如:HDFS,S3等)中的目录必须配置为 checkpoint 目录,而且流程应用程序以 checkpoint 信息的方式编写Streaming代码.

  5. 配置应用程序 driver 的自动从新启动

    • standalone

      能够提交 Spark 应用程序 driver 以在Spark Standalone集群中运行即应用程序 driver 自己在其中一个工做节点上运行.此外,能够指示独立的群集管理器来监督 driver,若是因为 非正常退出(non-zero exit code)而致使 driver 发生故障,或因为运行 driver 的节点发生故障,则能够从新启动它.

      能够查看: Spark Standalone Mode

    • YARN

      Yarn支持相似的机制实现自启动,能够查看yarn相关文档

    • Mesos Marathon 在 Mesos上实现了这一点.

  6. 配置预写日志 - 自 Spark 1.2 以来,咱们引入了写入日志来实现强大的容错保证.

    若是启用,则从 receiver 接收的全部数据都将写入配置 checkpoint 目录中的写入日志.这能够防止 driver 恢复时的数据丢失,从而确保零数据丢失.

    能够经过将 配置参数 spark.streaming.receiver.writeAheadLog.enable 设置为 true来启用此功能.

    然而,这些更强的语义可能以单个 receiver 的接收吞吐量为代价.经过 并行运行更多的 receiver (会在稍后提到)能够纠正这一点,以增长总吞吐量.

    另外,建议在启用写入日志时,在日志已经存储在复制的存储系统中时,禁用在 Spark 中接收到的数据的复制.这能够经过将输入流的存储级别设置为 StorageLevel.MEMORY_AND_DISK_SER 来完成.

    使用 S3(或任何不支持刷新的文件系统)写入日志时,请记住启用 spark.streaming.driver.writeAheadLog.closeFileAfterWrite 和spark.streaming.receiver.writeAheadLog.closeFileAfterWrite.

    有关详细信息,请参阅官方的 Spark Streaming配置.

    请注意,启用 I/O 加密时,Spark 不会将写入写入日志的数据加密.若是须要对提早记录数据进行加密,则应将其存储在本地支持加密的文件系统中.

  7. 设置最大接收速率 - 若是集群资源不够大,须要限制最大接收速率 能够经过:

    receiver 的 spark.streaming.receiver.maxRate 和用于 Direct Kafka 方法的 spark.streaming.kafka.maxRatePerPartition 的 配置参数.

    而在Spark 1.5中,咱们引入了一个称为 backpressure (反压)的功能,无需设置此速率限制,由于Spark Streaming会自动计算速率限制,并在处理条件发生变化时动态调整速率限制.

    能够经过将 配置参数 spark.streaming.backpressure.enabled 设置为 true 来启用此 backpressure.

升级应用程序代码

有两种处理策略:

  1. 新旧代码并行运行, 直到你认为能够的时候, 停掉原有代码.

  2. 正常中止,在 JavaStreamingContext.stop() 方法中接收两个参数, 第一个是 是否执行完当前数据, 第二个是是否中止 SparkContext. 在当前任务已经执行完以后, 再中止程序, 新的程序启动以后, 会从上次的checkpoint去启动, 而这也正是 checkpoint的缺点, 即 两次的 代码已经变动, 两次的序列化结果, 反序列化并不一致, 所以必须删除 checkpoint 目录才可以正常启动, 这也就意味着在上次中止是 保存的 application 信息 已经消失.

性能调优

性能调优的核心在于:

  1. 减小数据处理的时间
  2. 设定合理的批处理间隔

当二者基本保持一致, 则就可以快速, 有效地处理数据. 至于如何减小数据处理的时间, 则是仁者见仁智者见智.

基本须要本身的大数据处理经验, 更优良的算法, 再者就是所使用的 大数据处理框架相结合.

在这里, 仅仅介绍框架相关的部分, 更多的 则会在 性能调优方面 详细介绍.

链接以下: Spark调优

Level of Parallelism in Data Receiving(数据接收中的并行级别)

经过网络接收数据(如Kafka,Flume,socket 等)须要 deserialized(反序列化)数据并存储在 Spark 中.若是数据接收成为系统的瓶颈,那么考虑一下 parallelizing the data receiving(并行化数据接收).

注意每一个 input DStream 建立接收 single stream of data(单个数据流)的 single receiver(单个接收器)(在 worker 上运行).所以,能够经过建立多个 input DStreams 来实现 Receiving multiple data streams(接收多个数据流)并配置它们以从 source(s) 接收 data stream(数据流)的 different partitions(不一样分区).

例如,接收 two topics of data(两个数据主题)的单个Kafka input DStream 能够分为两个 Kafka input streams(输入流),每一个只接收一个 topic(主题).这将运行两个 receivers(接收器),容许 in parallel(并行)接收数据,从而提升 overall throughput(整体吞吐量).这些 multiple DStreams 能够 unioned(联合起来)建立一个 single DStream.而后 transformations(转化)为应用于 single input DStream 能够应用于 unified stream.以下这样作:

int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<>(numStreams);
for (int i = 0; i < numStreams; i++) {
    kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();

与此同时, 须要关注的另外一个参数是: spark.streaming.blockInterval

spark.streaming.blockInterval:

Spark Receiver 接收到的数据 在存入 Spark以前 被进行 分块操做, 分块的间隔. 最低推荐是50ms.

每一个 receiver 每批次的任务数量将是大约(batch interval(批间隔)/ block interval(块间隔)).例如,200 ms的 block interval(块间隔)每 2 秒 batches(批次)建立 10 个 tasks(任务).若是 tasks(任务)数量太少(即少于每一个机器的内核数量),那么它将无效,由于全部可用的内核都不会被使用处理数据.要增长 given batch interval(给定批间隔)的 tasks(任务)数量,请减小 block interval(块间​​隔). 但不该该低于50ms.

使用 多个输入流/ receivers 接收数据的替代方法是明确 repartition(从新分配)input data stream(输入数据流)(使用 inputStream.repartition(<number of partitions>)).这会在进一步处理以前将收到的批次数据分发到集群中指定数量的计算机.

Level of Parallelism in Data Processing(数据处理中的并行度)

若是任意 stage 的并行度设置的不够, 则会致使 集群资源 得不到充分利用.

至于并行度的设置

参考:Spark性能调优:合理设置并行度

网上相似的文章数不胜数, 就随便找了一篇.

核心点在于, 并行度设置太低, 即便分配的CPU再多, 也用不到那么多资源.

task数量,设置成spark Application 总cpu core数量的2~3倍 ,好比150个cpu core ,基本设置 task数量为 300~ 500, 与理性状况不一样的,有些task 会运行快一点,好比50s 就完了,有些task 可能会慢一点,要一分半才运行完,因此若是你的task数量,恰好设置的跟cpu core 数量相同,可能会致使资源的浪费.

Data Serialization(数据序列化)

能够经过调优 serialization formats(序列化格式)来减小数据 serialization(序列化)的开销.在 streaming 的状况下,有两种类型的数据被 serialized(序列化).

  • Input data(输入数据):默认状况下,经过 Receivers 接收的 input data(输入数据)经过 StorageLevel.MEMORY_AND_DISK_SER_2 存储在 executors 的内存中, 数据首先保留在内存中,而且只有在内存不足以容纳流计算所需的全部输入数据时才会 spilled over(溢出)到磁盘.这个序列化显然具备开销 - receiver(接收器)必须使接收的数据 deserialize(反序列化),并使用 Spark 的 serialization format(序列化格式)从新序列化它.

  • Persisted RDDs generated by Streaming Operations(流式操做生成的持久 RDDs):经过 streaming computations(流式计算)生成的 RDD 可能会持久存储在内存中.例如,window operations(窗口操做)会将数据保留在内存中,由于它们将被处理屡次.可是,与 StorageLevel.MEMORY_ONLY 的 Spark Core 默认状况不一样,经过流式计算生成的持久化 RDD 将以 StorageLevel.MEMORY_ONLY_SER(即序列化),以最小化 GC 开销.

在上述两种状况下, 使用kryo均可以有效减小CPU开销 和 内存开销.

然而, 序列化自己也是一种开销, 在须要保存的数据量不大, 内存足够的状况下:

能够将数据做为 deserialized objects(反序列化对象)持久化,而不会致使过多的 GC 开销.例如,若是你使用几秒钟的 batch intervals(批次间隔)而且没有 window operations(窗口操做),那么能够经过明确地相应地设置 storage level(存储级别)来尝试禁用 serialization in persisted data(持久化数据中的序列化).这将减小因为序列化形成的 CPU 开销,潜在地提升性能,而不须要太多的 GC 开销.

Task Launching Overheads(任务启动开销)

若是每秒启动的任务数量很高(好比每秒 50 个或更多),那么这个开销向 slaves 发送任务多是重要的,而且将难以实现 sub-second latencies(次要的延迟).能够经过如下更改减小开销:

  • Execution mode(执行模式):以 Standalone 模式 或 coarse-grained Mesos 模式运行 Spark 比 fine-grained Mesos 模式更好的任务启动时间.

这些更改可能会将 批处理时间 缩短 100 毫秒,从而容许 sub-second batch size(次秒批次大小)是可行的.

Setting the Right Batch Interval(设置正确的批次间隔)

毫无疑问,若是想要系统 稳定可持续, 咱们必须保证数据的流入流出速率保持均衡, 也即每批次接收的数据 与数据的 处理速度维持平衡.

而调试速率的办法就是 增长batchInterval 和 减小 数据的流入速率, 经过 SparkUI Streaming页签下中能够观测到 处理延时, 也即 total delay列.

保持 total delay 小于等于 batch interval便可.

即便真实环境中, 数据有突发性也无需在意, 只须要保证数据在整个运行期间的速率基本保持均衡便可.

然而须要注意到的一点是, 增大 batchInterval 也意味着 有可能增长了内存开销.

Memory Tuning(内存调优)

Spark Streaming application 所需的集群内存量在很大程度上取决于所使用的 transformations 类型.例如,若是要在最近 10 分钟的数据中使用 window operation(窗口操做),那么您的集群应该有足够的内存来容纳内存中 10 分钟的数据.或者若是要使用大量 keys 的 updateStateByKey,那么必要的内存将会很高.相反,若是你想作一个简单的 map-filter-store 操做,那么所需的内存就会很低.

通常来讲, receiver中接收到的数据, 在内存溢出的时候 会序列化 存储到硬盘中, 这可能会下降 streaming application 的性能,所以建议提供足够的内存以供使用.最好仔细查看内存使用量并相应地进行估算.

内存调优的另外一个方面是 垃圾收集.对于须要低延迟的 streaming application,由 JVM 垃圾回收引发的大量暂停是不但愿的.

就以上几点来讲:

  • DStreams 的持久性级别:如前面在 Data Serialization 部分中所述,input data 和 RDD 默认保持为 serialized bytes(序列化字节).与 deserialized persistence(反序列化持久性)相比,这减小了内存使用量和 GC 开销.

    能够经过启用 Kryo serialization 进一步减小了序列化大小和内存使用.

    以及能够经过 compression(压缩)来实现内存使用的进一步减小(参见Spark配置 spark.rdd.compress),代价是 CPU 时间.

  • Clearing old data(清除旧数据):默认状况下,DStream 转换生成的全部 input data 和 persisted RDDs 将自动清除.Spark Streaming 决定什么时候根据所使用的 transformations 来清除数据.例如,若是您使用 10 分钟的 window operation(窗口操做),则 Spark Streaming 将保留最近 10 分钟的数据,并主动丢弃旧数据.数据能够经过设置 streamingContext.remember 保持更长的持续时间(例如交互式查询旧数据, 以前提到的跨线程 访问StreamingData).

  • CMS Garbage Collector(CMS垃圾收集器):强烈建议使用 concurrent mark-and-sweep GC,以保持 GC 相关的暂停始终如一.CMS的优势就是, 尽可能减小暂停式GC,经过与任务并行执行的方式, 执行GC.即便 concurrent GC 已知能够减小 系统的总体处理吞吐量,但仍然建议实现更多一致的 batch processing times(批处理时间).确保在 driver( 在 spark-submit 中使用 --driver-java-options)和 executors(使用 Spark configuration spark.executor.extraJavaOptions)中设置 CMS GC.

  • Other tips(其余提示):为了进一步下降 GC 开销,如下是一些更多的提示.

    使用 OFF_HEAP 存储级别的保持 RDDs,使用更小的 heap sizes 的 executors.这将下降每一个 JVM heap 内的 GC 压力.

    至于什么是 OFF_HEAP?

    与ON_HEAP对立, 表示 存储在 Java堆内存以外的数据.

    也即 将数据存储在机器内存中.

    参考连接: Spark 内存管理之—OFF_HEAP

小结

总结来讲:

  • 每一个DStream 都与 single receiver相关联.为了得到读取并行性,须要建立多个 receivers,即 multiple DStreams.receiver 在一个 executor 中运行.它占据一个 core(内核).确保在 receiver slots are booked 后有足够的内核进行处理,即 spark.cores.max 应该考虑 receiver slots.receivers 以循环方式分配给 executors.

  • 当从 stream source 接收到数据时,receiver 建立数据 blocks(块).每一个 blockInterval(默认200ms) 毫秒生成一个新的数据块.在 N = batchInterval/blockInterval 的 batchInterval 期间建立 N 个数据块.这些块由当前 executor 的 BlockManager 分发给其余执行程序的 block managers.以后,在驱动程序上运行的 Network Input Tracker(网络输入跟踪器)通知有关进一步处理的块位置

  • 在驱动程序中为在 batchInterval 期间建立的块建立一个 RDD.在 batchInterval 期间生成的块是 RDD 的 partitions.每一个分区都是一个 spark 中的 task.blockInterval == batchinterval 意味着建立 single partition(单个分区),而且可能在本地进行处理.

  • 除非使用non-local(非本地调度)的方式, 不然这些块上的map任务都运行在执行器单元上(一个在接收数据块的位置,另外一个在数据块被备份到的位置),而不会考虑block interval,而更大的 block interval 意味着更大的块.

    增大spark.locality.wait 即增长了处理 local node(本地节点)上的块的可能性.

    所以,须要在这两个参数之间找到平衡,以确保在本地处理较大的块.

  • 除了调整block interval 和 batch interval以外, 您能够经过调用 inputDstream.repartition(n) 来定义分区数.

    这样能够随机从新组合 RDD 中的数据,建立 n 个分区以求更大的并行性.虽然是 shuffle 的代价.

    可是咱们须要注意的是,若是数据自己每一个partition中的数据有较强的关联性, 使用这种方法须要谨慎.

    另外,须要考虑到的问题是,虽然咱们有了更大的并行度, 但自身的集群资源是否支持这样高的并行性?即分配的核心数, executor数量是否足够?

    RDD 的处理由 driver’s jobscheduler 做为一项工做安排.在给定的时间点,只有一个 job 是 active 的.所以,若是一个做业正在执行,则其余做业将排队.

  • 若是您有两个 dstream,将会有两个 RDDs 被建立,而且会建立两个任务,而后被一个接一个的调度.为了不这种状况,你能够对这两个DStream执行union操做.这保证了两个DStream RDD会产生一个unionRDD,这个unionRDD会当作一个单独的job.但 RDD 的 partitioning(分区)不受影响.

    而Spark Job: 每一个Action算子本质上是执行了sc的runJob方法,这是一个重载方法.核心是交给DAGScheduler中的submitJob执行, 进而建立了不一样的job.

  • 若是 批处理时间)超过 batchinterval(批次间隔),那么显然 receiver 的内存将会开始填满,最终会抛出 exceptions(最多是 BlockNotFoundException).目前没有办法暂停 receiver.使用 SparkConf 配置 spark.streaming.receiver.maxRate,receiver 的 rate 能够受到限制.

相关文章
相关标签/搜索