Spark 学习 (十二) Spark Streaming详解

一,简介node

  1.1 概述算法

  1.2 术语定义shell

  1.3 Storm和Spark Streaming比较数据库

二,运行原理apache

  2.1 Streaming架构编程

  2.2 容错,持久化和性能调优服务器

    2.2.1 容错网络

    2.2.2 持久化架构

    2.2.3 性能调优框架

三,编程模型

  3.1 如何使用Spark Streaming

  3.2 DStream的输入源

    3.2.1 基础来源

    3.2.2 高级来源

  3.3 DStream的操做

    3.3.1 普通的转换操做

    3.3.2 窗口转换操做

    3.3.3 输出操做

 

 

 

 

  

正文

一,简介

  1.1 概述

  是一个基于Spark Core之上的实时计算框架,能够从不少数据源消费数据并对数据进行处理.Spark Streaming 是Spark核心API的一个扩展,能够实现高吞吐量的、具有容错机制的实时流数据的处理。支持从多种数据源获取数据,包括Kafk、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets,从数据源获取数据以后,可使用诸如map、reduce、join和window等高级函数进行复杂算法的处理。最后还能够将处理结果存储到文件系统,数据库和现场仪表盘。在“One Stack rule them all”的基础上,还可使用Spark的其余子框架,如集群学习、图计算等,对流数据进行处理。

  Spark Streaming处理的数据流图:

  

  Spark的各个子框架,都是基于核心Spark的,Spark Streaming在内部的处理机制是,接收实时流的数据,并根据必定的时间间隔拆分红一批批的数据,而后经过Spark Engine处理这些批数据,最终获得处理后的一批批结果数据。

  对应的批数据,在Spark内核对应一个RDD实例,所以,对应流数据的DStream能够当作是一组RDDs,即RDD的一个序列。通俗点理解的话,在流数据分红一批一批后,经过一个先进先出的队列,而后 Spark Engine从该队列中依次取出一个个批数据,把批数据封装成一个RDD,而后进行处理,这是一个典型的生产者消费者模型,对应的就有生产者消费者模型的问题,即如何协调生产速率和消费速率。

  1.2 术语定义

  离散流(discretized stream)或DStream

  本质上就是一系列连续的RDD,DStream其实就是对RDD的封装DStream能够任务是一个RDD的工厂,该DStream里面生产都是相同业务逻辑的RDD,只不过是RDD里面要读取数据的不相同

  他是sparkStreaming中的一个最基本的抽象,表明了一下列连续的数据流,本质上是一系列连续的RDD,你对DStream进行操做,就是对RDD进行操做

  DStream每隔一段时间生成一个RDD,你对DStream进行操做,本质上是对里面的对应时间的RDD进行操做

  DSteam和DStream之间存在依赖关系,在一个固定的时间点,对个存在依赖关系的DSrteam对应的RDD也存在依赖关系,
每一个一个固定的时间,其实生产了一个小的DAG,周期性的将生成的小DAG提交到集群中运行

  DStream图例:

  

 

  批数据(batch data):这是化整为零的第一步,将实时流数据以时间片为单位进行分批,将流处理转化为时间片数据的批处理。随着持续时间的推移,这些处理结果就造成了对应的结果数据流了。

  时间片或批处理时间间隔( batch interval):这是人为地对流数据进行定量的标准,以时间片做为咱们拆分流数据的依据。一个时间片的数据对应一个RDD实例。

  窗口长度(window length):一个窗口覆盖的流数据的时间长度。必须是批处理时间间隔的倍数,

  滑动时间间隔:前一个窗口到后一个窗口所通过的时间长度。必须是批处理时间间隔的倍数

  Input DStream :一个input DStream是一个特殊的DStream,将Spark Streaming链接到一个外部数据源来读取数据。

  1.3 Storm和Spark Streaming比较

  处理模型以及延迟

  虽然两框架都提供了可扩展性(scalability)和可容错性(fault tolerance),可是它们的处理模型从根本上说是不同的。Storm能够实现亚秒级时延的处理,而每次只处理一条event,而Spark Streaming能够在一个短暂的时间窗口里面处理多条(batches)Event。因此说Storm能够实现亚秒级时延的处理,而Spark Streaming则有必定的时延。

  容错和数据保证

  然而二者的代价都是容错时候的数据保证,Spark Streaming的容错为有状态的计算提供了更好的支持。在Storm中,每条记录在系统的移动过程当中都须要被标记跟踪,因此Storm只能保证每条记录最少被处理一次,可是容许从错误状态恢复时被处理屡次。这就意味着可变动的状态可能被更新两次从而致使结果不正确。

  任一方面,Spark Streaming仅仅须要在批处理级别对记录进行追踪,因此他能保证每一个批处理记录仅仅被处理一次,即便是node节点挂掉。虽说Storm的 Trident library能够保证一条记录被处理一次,可是它依赖于事务更新状态,而这个过程是很慢的,而且须要由用户去实现。

  实现和编程API

  Storm主要是由Clojure语言实现,Spark Streaming是由Scala实现。若是你想看看这两个框架是如何实现的或者你想自定义一些东西你就得记住这一点。Storm是由BackType和Twitter开发,而Spark Streaming是在UC Berkeley开发的。

  Storm提供了Java API,同时也支持其余语言的API。 Spark Streaming支持Scala和Java语言(其实也支持Python)。

  批处理框架集成

  Spark Streaming的一个很棒的特性就是它是在Spark框架上运行的。这样你就能够想使用其余批处理代码同样来写Spark Streaming程序,或者是在Spark中交互查询。这就减小了单独编写流批量处理程序和历史数据处理程序。

  生产支持

  Storm已经出现好多年了,并且自从2011年开始就在Twitter内部生产环境中使用,还有其余一些公司。而Spark Streaming是一个新的项目,而且在2013年仅仅被Sharethrough使用(据做者了解)。

  Storm是 Hortonworks Hadoop数据平台中流处理的解决方案,而Spark Streaming出如今 MapR的分布式平台和Cloudera的企业数据平台中。除此以外,Databricks是为Spark提供技术支持的公司,包括了Spark Streaming。

  虽说二者均可以在各自的集群框架中运行,可是Storm能够在Mesos上运行, 而Spark Streaming能够在YARN和Mesos上运行。

  

二,运行原理

  2.1 Streaming架构

  SparkStreaming是一个对实时数据流进行高通量、容错处理的流式处理系统,能够对多种数据源(如Kafka、Flume、Twitter、Zero和TCP 套接字)进行相似Map、Reduce和Join等复杂操做,并将结果保存到外部文件系统、数据库或应用到实时仪表盘。

  计算流程Spark Streaming是将流式计算分解成一系列短小的批处理做业。这里的批处理引擎是Spark Core,也就是把Spark Streaming的输入数据按照batch size(如1秒)分红一段一段的数据(Discretized Stream),每一段数据都转换成Spark中的RDD(Resilient Distributed Dataset),而后将Spark Streaming中对DStream的Transformation操做变为针对Spark中对RDD的Transformation操做,将RDD通过操做变成中间结果保存在内存中。整个流式计算根据业务的需求能够对中间的结果进行叠加或者存储到外部设备。下图显示了Spark Streaming的整个流程。

  

  2.2 容错,持久化和性能调优

    2.2.1 容错

    对于流式计算来讲,容错性相当重要。首先咱们要明确一下Spark中RDD的容错机制。每个RDD都是一个不可变的分布式可重算的数据集,其记录着肯定性的操做继承关系(lineage),因此只要输入数据是可容错的,那么任意一个RDD的分区(Partition)出错或不可用,都是能够利用原始输入数据经过转换操做而从新算出的。  

    对于Spark Streaming来讲,其RDD的传承关系以下图所示,图中的每个椭圆形表示一个RDD,椭圆形中的每一个圆形表明一个RDD中的一个Partition,图中的每一列的多个RDD表示一个DStream(图中有三个DStream),而每一行最后一个RDD则表示每个Batch Size所产生的中间结果RDD。咱们能够看到图中的每个RDD都是经过lineage相链接的,因为Spark Streaming输入数据能够来自于磁盘,例如HDFS(多份拷贝)或是来自于网络的数据流(Spark Streaming会将网络输入数据的每个数据流拷贝两份到其余的机器)都能保证容错性,因此RDD中任意的Partition出错,均可以并行地在其余机器上将缺失的Partition计算出来。这个容错恢复方式比连续计算模型(如Storm)的效率更高。

    以下图:Spark Streaming中RDD的lineage关系图

    

    2.2.2 持久化

  与RDD同样,DStream一样也能经过persist()方法将数据流存放在内存中,默认的持久化方式是MEMORY_ONLY_SER,也就是在内存中存放数据同时序列化的方式,这样作的好处是遇到须要屡次迭代计算的程序时,速度优点十分的明显。而对于一些基于窗口的操做,如reduceByWindow、reduceByKeyAndWindow,以及基于状态的操做,如updateStateBykey,其默认的持久化策略就是保存在内存中。

  对于来自网络的数据源(Kafka、Flume、sockets等),默认的持久化策略是将数据保存在两台机器上,这也是为了容错性而设计的。

另外,对于窗口和有状态的操做必须checkpoint,经过StreamingContext的checkpoint来指定目录,经过 Dtream的checkpoint指定间隔时间,间隔必须是滑动间隔(slide interval)的倍数。

    2.2.3 性能调优

  1.  优化运行时间

    增长并行度 确保使用整个集群的资源,而不是把任务集中在几个特定的节点上。对于包含shuffle的操做,增长其并行度以确保更为充分地使用集群资源;

    减小数据序列化,反序列化的负担 Spark Streaming默认将接受到的数据序列化后存储,以减小内存的使用。可是序列化和反序列话须要更多的CPU时间,所以更加高效的序列化方式(Kryo)和自定义的系列化接口能够更高效地使用CPU;

    设置合理的batch duration(批处理时间间) 在Spark Streaming中,Job之间有可能存在依赖关系,后面的Job必须确保前面的做业执行结束后才能提交。若前面的Job执行的时间超出了批处理时间间隔,那么后面的Job就没法按时提交,这样就会进一步拖延接下来的Job,形成后续Job的阻塞。所以设置一个合理的批处理间隔以确保做业可以在这个批处理间隔内结束时必须的;

    减小因任务提交和分发所带来的负担 一般状况下,Akka框架可以高效地确保任务及时分发,可是当批处理间隔很是小(500ms)时,提交和分发任务的延迟就变得不可接受了。使用Standalone和Coarse-grained Mesos模式一般会比使用Fine-grained Mesos模式有更小的延迟。

  2.  优化内存使用

    控制batch size(批处理间隔内的数据量) Spark Streaming会把批处理间隔内接收到的全部数据存放在Spark内部的可用内存区域中,所以必须确保当前节点Spark的可用内存中少能容纳这个批处理时间间隔内的全部数据,不然必须增长新的资源以提升集群的处理能力;

    及时清理再也不使用的数据 前面讲到Spark Streaming会将接受的数据所有存储到内部可用内存区域中,所以对于处理过的再也不须要的数据应及时清理,以确保Spark Streaming有富余的可用内存空间。经过设置合理的spark.cleaner.ttl时长来及时清理超时的无用数据,这个参数须要当心设置以避免后续操做中所须要的数据被超时错误处理;

    观察及适当调整GC策略 GC会影响Job的正常运行,可能延长Job的执行时间,引发一系列不可预料的问题。观察GC的运行状况,采用不一样的GC策略以进一步减少内存回收对Job运行的影响。

三,编程模型

  3.1 如何使用Spark Streaming

  DStream(Discretized Stream)做为Spark Streaming的基础抽象,它表明持续性的数据流。这些数据流既能够经过外部输入源赖获取,也能够经过现有的Dstream的transformation操做来得到。在内部实现上,DStream由一组时间序列上连续的RDD来表示。每一个RDD都包含了本身特定时间间隔内的数据流。如图7-3所示。

  

   DStream中在时间轴下生成离散的RDD序列:

  

  对DStream中数据的各类操做也是映射到内部的RDD上来进行的,如图7-4所示,对Dtream的操做能够经过RDD的transformation生成新的DStream。这里的执行引擎是Spark。

  做为构建于Spark之上的应用框架,Spark Streaming承袭了Spark的编程风格,对于已经了解Spark的用户来讲可以快速地上手。接下来以Spark Streaming官方提供的WordCount代码为例来介绍Spark Streaming的使用方式。

  

import org.apache.spark._

import org.apache.spark.streaming._

import org.apache.spark.streaming.StreamingContext._


// Create a local StreamingContext with two working thread and batch interval of 1 second.

// The master requires 2 cores to prevent from a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

val ssc = new StreamingContext(conf, Seconds(1))


// Create a DStream that will connect to hostname:port, like localhost:9999

val lines = ssc.socketTextStream("localhost", 9999)

// Split each line into words

val words = lines.flatMap(_.split(" "))

import org.apache.spark.streaming.StreamingContext._

// Count each word in each batch

val pairs = words.map(word => (word, 1))

val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console

wordCounts.print()

ssc.start()              // Start the computation

ssc.awaitTermination()  // Wait for the computation to terminate

  1.建立StreamingContext对象 同Spark初始化须要建立SparkContext对象同样,使用Spark Streaming就须要建立StreamingContext对象。建立StreamingContext对象所需的参数与SparkContext基本一致,包括指明Master,设定名称(如NetworkWordCount)。须要注意的是参数Seconds(1),Spark Streaming须要指定处理数据的时间间隔,如上例所示的1s,那么Spark Streaming会以1s为时间窗口进行数据处理。此参数须要根据用户的需求和集群的处理能力进行适当的设置;

  2.建立InputDStream如同Storm的Spout,Spark Streaming须要指明数据源。如上例所示的socketTextStream,Spark Streaming以socket链接做为数据源读取数据。固然Spark Streaming支持多种不一样的数据源,包括Kafka、 Flume、HDFS/S三、Kinesis和Twitter等数据源;

  3.操做DStream对于从数据源获得的DStream,用户能够在其基础上进行各类操做,如上例所示的操做就是一个典型的WordCount执行流程:对于当前时间窗口内从数据源获得的数据首先进行分割,而后利用Map和ReduceByKey方法进行计算,固然最后还有使用print()方法输出结果;

  4.启动Spark Streaming以前所做的全部步骤只是建立了执行流程,程序没有真正链接上数据源,也没有对数据进行任何操做,只是设定好了全部的执行计划,当ssc.start()启动后程序才真正进行全部预期的操做。

  至此对于Spark Streaming的如何使用有了一个大概的印象,在后面的章节咱们会经过源代码深刻探究一下Spark Streaming的执行流程。

  3.2 DStream的输入源

   在Spark Streaming中全部的操做都是基于流的,而输入源是这一系列操做的起点。输入 DStreams 和 DStreams 接收的流都表明输入数据流的来源,在Spark Streaming 提供两种内置数据流来源:

  基础来源 在 StreamingContext API 中直接可用的来源。例如:文件系统、Socket(套接字)链接和 Akka actors;

  高级来源 如 Kafka、Flume、Kinesis、Twitter 等,能够经过额外的实用工具类建立。

    3.2.1 基础来源

  在前面分析怎样使用Spark Streaming的例子中咱们已看到ssc.socketTextStream()方法,能够经过 TCP 套接字链接,从从文本数据中建立了一个 DStream。除了套接字,StreamingContext 的API还提供了方法从文件和 Akka actors 中建立 DStreams做为输入源。

  Spark Streaming提供了streamingContext.fileStream(dataDirectory)方法能够从任何文件系统(如:HDFS、S三、NFS 等)的文件中读取数据,而后建立一个DStream。Spark Streaming 监控 dataDirectory 目录和在该目录下任何文件被建立处理(不支持在嵌套目录下写文件)。须要注意的是:读取的必须是具备相同的数据格式的文件;建立的文件必须在dataDirectory 目录下,并经过自动移动或重命名成数据目录;文件一旦移动就不能被改变,若是文件被不断追加,新的数据将不会被阅读。对于简单的文本文,可使用一个简单的方法streamingContext.textFileStream(dataDirectory)来读取数据。

  Spark Streaming也能够基于自定义 Actors 的流建立DStream ,经过 Akka actors 接受数据流,使用方法streamingContext.actorStream(actorProps, actor-name)。Spark Streaming使用 streamingContext.queueStream(queueOfRDDs)方法能够建立基于 RDD 队列的DStream,每一个RDD 队列将被视为 DStream 中一块数据流进行加工处理。

    3.2.2 高级来源

  这一类的来源须要外部 non-Spark 库的接口,其中一些有复杂的依赖关系(如 Kafka、Flume)。所以经过这些来源建立 DStreams 须要明确其依赖。例如,若是想建立一个使用Twitter tweets 的数据的DStream 流,必须按如下步骤来作:

  1)在 SBT 或 Maven工程里添加 spark-streaming-twitter_2.10 依赖。

  2)开发:导入 TwitterUtils 包,经过 TwitterUtils.createStream 方法建立一个DStream。

  3)部署:添加全部依赖的 jar 包(包括依赖的spark-streaming-twitter_2.10 及其依赖),而后部署应用程序。

  须要注意的是,这些高级的来源通常在Spark Shell中不可用,所以基于这些高级来源的应用不能在Spark Shell中进行测试。若是你必须在Spark shell中使用它们,你须要下载相应的Maven工程的Jar依赖并添加到类路径中。

  其中一些高级来源以下:

  Twitter Spark Streaming的TwitterUtils工具类使用Twitter4j,Twitter4J 库支持经过任何方法提供身份验证信息,你能够获得公众的流,或获得基于关键词过滤流。

  Flume Spark Streaming能够从Flume中接受数据。

  Kafka Spark Streaming能够从Kafka中接受数据。

  Kinesis Spark Streaming能够从Kinesis中接受数据。

  须要重申的一点是在开始编写本身的 SparkStreaming 程序以前,必定要将高级来源依赖的Jar添加到SBT 或 Maven 项目相应的artifact中。常见的输入源和其对应的Jar包以下图所示。

  

  另外,输入DStream也能够建立自定义的数据源,须要作的就是实现一个用户定义的接收器。

  3.3 DStream的操做

    与RDD相似,DStream也提供了本身的一系列操做方法,这些操做能够分红三类:普通的转换操做、窗口转换操做和输出操做。

    3.3.1 普通的转换操做

    普通的转换操做以下表所示:

  

    在上面列出的这些操做中,transform()方法和updateStateByKey()方法值得咱们深刻的探讨一下:

   transform(func)操做

  该transform操做(转换操做)连同其其相似的 transformWith操做容许DStream 上应用任意RDD-to-RDD函数。它能够被应用于未在 DStream API 中暴露任何的RDD操做。例如,在每批次的数据流与另外一数据集的链接功能不直接暴露在DStream API 中,但能够轻松地使用transform操做来作到这一点,这使得DStream的功能很是强大。例如,你能够经过链接预先计算的垃圾邮件信息的输入数据流(可能也有Spark生成的),而后基于此作实时数据清理的筛选,以下面官方提供的伪代码所示。事实上,也能够在transform方法中使用机器学习和图形计算的算法。

  updateStateByKey操做

  该 updateStateByKey 操做可让你保持任意状态,同时不断有新的信息进行更新。要使用此功能,必须进行两个步骤 :

  (1)  定义状态 - 状态能够是任意的数据类型。

  (2)  定义状态更新函数 - 用一个函数指定如何使用先前的状态和从输入流中获取的新值 更新状态。

  让咱们用一个例子来讲明,假设你要进行文本数据流中单词计数。在这里,正在运行的计数是状态并且它是一个整数。咱们定义了更新功能以下:

    3.3.2 窗口转换操做

  Spark Streaming 还提供了窗口的计算,它容许你经过滑动窗口对数据进行转换,窗口转换操做以下:

转换

描述

window(windowLengthslideInterval)

返回一个基于源DStream的窗口批次计算后获得新的DStream。

countByWindow(windowLength,slideInterval)

返回基于滑动窗口的DStream中的元素的数量。

reduceByWindow(funcwindowLength,slideInterval)

基于滑动窗口对源DStream中的元素进行聚合操做,获得一个新的DStream。

reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks])

基于滑动窗口对(K,V)键值对类型的DStream中的值按K使用聚合函数func进行聚合操做,获得一个新的DStream。

reduceByKeyAndWindow(func,invFunc,windowLengthslideInterval, [numTasks])

一个更高效的reduceByKkeyAndWindow()的实现版本,先对滑动窗口中新的时间间隔内数据增量聚合并移去最先的与新增数据量的时间间隔内的数据统计量。例如,计算t+4秒这个时刻过去5秒窗口的WordCount,那么咱们能够将t+3时刻过去5秒的统计量加上[t+3,t+4]的统计量,在减去[t-2,t-1]的统计量,这种方法能够复用中间三秒的统计量,提升统计的效率。

countByValueAndWindow(windowLength,slideInterval, [numTasks])

基于滑动窗口计算源DStream中每一个RDD内每一个元素出现的频次并返回DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素频次。与countByValue同样,reduce任务的数量能够经过一个可选参数进行配置。

      

  

  在Spark Streaming中,数据处理是按批进行的,而数据采集是逐条进行的,所以在Spark Streaming中会先设置好批处理间隔(batch duration),当超过批处理间隔的时候就会把采集到的数据汇总起来成为一批数据交给系统去处理。

  对于窗口操做而言,在其窗口内部会有N个批处理数据,批处理数据的大小由窗口间隔(window duration)决定,而窗口间隔指的就是窗口的持续时间,在窗口操做中,只有窗口的长度知足了才会触发批数据的处理。除了窗口的长度,窗口操做还有另外一个重要的参数就是滑动间隔(slide duration),它指的是通过多长时间窗口滑动一次造成新的窗口,滑动窗口默认状况下和批次间隔的相同,而窗口间隔通常设置的要比它们两个大。在这里必须注意的一点是滑动间隔和窗口间隔的大小必定得设置为批处理间隔的整数倍。

  如批处理间隔示意图所示,批处理间隔是1个时间单位,窗口间隔是3个时间单位,滑动间隔是2个时间单位。对于初始的窗口time 1-time 3,只有窗口间隔知足了才触发数据的处理。这里须要注意的一点是,初始的窗口有可能流入的数据没有撑满,可是随着时间的推动,窗口最终会被撑满。当每一个2个时间单位,窗口滑动一次后,会有新的数据流入窗口,这时窗口会移去最先的两个时间单位的数据,而与最新的两个时间单位的数据进行汇总造成新的窗口(time3-time5)。

  对于窗口操做,批处理间隔、窗口间隔和滑动间隔是很是重要的三个时间概念,是理解窗口操做的关键所在。

    3.3.3 输出操做

  Spark Streaming容许DStream的数据被输出到外部系统,如数据库或文件系统。因为输出操做实际上使transformation操做后的数据能够经过外部系统被使用,同时输出操做触发全部DStream的transformation操做的实际执行(相似于RDD操做)。如下表列出了目前主要的输出操做:

  

转换

描述

print()

在Driver中打印出DStream中数据的前10个元素。

saveAsTextFiles(prefix, [suffix])

将DStream中的内容以文本的形式保存为文本文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。

saveAsObjectFiles(prefix, [suffix])

将DStream中的内容按对象序列化而且以SequenceFile的格式保存。其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。

saveAsHadoopFiles(prefix, [suffix])

将DStream中的内容以文本的形式保存为Hadoop文件,其中每次批处理间隔内产生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。

foreachRDD(func)

最基本的输出操做,将func函数应用于DStream中的RDD上,这个操做会输出数据到外部系统,好比保存RDD到文件或者网络数据库等。须要注意的是func函数是在运行该streaming应用的Driver进程里执行的。

  

  dstream.foreachRDD是一个很是强大的输出操做,它允将许数据输出到外部系统。可是 ,如何正确高效地使用这个操做是很重要的,下面展现了如何去避免一些常见的错误。

  一般将数据写入到外部系统须要建立一个链接对象(如 TCP链接到远程服务器),并用它来发送数据到远程系统。出于这个目的,开发者可能在不经意间在Spark driver端建立了链接对象,并尝试使用它保存RDD中的记录到Spark worker上,以下面代码:

  

  这是不正确的,这须要链接对象进行序列化并从Driver端发送到Worker上。链接对象不多在不一样机器间进行这种操做,此错误可能表现为序列化错误(链接对不可序列化),初始化错误(链接对象在须要在Worker 上进行须要初始化) 等等,正确的解决办法是在 worker上建立的链接对象。

  一般状况下,建立一个链接对象有时间和资源开销。所以,建立和销毁的每条记录的链接对象可能招致没必要要的资源开销,并显著下降系统总体的吞吐量 。一个更好的解决方案是使用rdd.foreachPartition方法建立一个单独的链接对象,而后使用该链接对象输出的全部RDD分区中的数据到外部系统。

   这缓解了建立多条记录链接的开销。最后,还能够进一步经过在多个RDDs/ batches上重用链接对象进行优化。一个保持链接对象的静态池能够重用在多个批处理的RDD上将其输出到外部系统,从而进一步下降了开销。

   须要注意的是,在静态池中的链接应该按需延迟建立,这样能够更有效地把数据发送到外部系统。另外须要要注意的是:DStreams延迟执行的,就像RDD的操做是由actions触发同样。默认状况下,输出操做会按照它们在Streaming应用程序中定义的顺序一个个执行。

相关文章
相关标签/搜索