第4课:Spark Streaming的Exactly-One的事务处理和不重复输出完全掌握

本期内容:sql

1Exactly once 事务数据库

什么事Exactly once 事务?安全

数据仅处理一次而且仅输出一次,这样才是完整的事务处理。微信

Spark在运行出错时不能保证输出也是事务级别的。在Task执行一半的时候出错了,虽然在语义上作了事务处理,数据仅被处理一次,可是若是是输出到数据库中,那有空能将结果屡次保存到数据库中。Spark在任务失败时会进行重试,这样会致使结果屡次保存到数据库中。数据结构

以下图,当运行在Executor上的Receiver接收到数据经过BlockManager写入内存和磁盘,或者经过WAL机制写记录日志,而后把metedata信息汇报给Driver。在Driver端按期进行checkpoint操做。Job的执行仍是基于Spark Core的调度模式在Executor上执行Task性能

Exactly once 事务的处理:大数据

1,数据零丢失:必须有可靠的数据来源和可靠的Receiver,且整个应用程序的metadata必须进行checkpoint,且经过WAL来保证数据安全。spa

咱们以数据来自Kafka为例,运行在Executor上的Receiver在接收到来自Kafka的数据时会向Kafka发送ACK确认收到信息并读取下一条信息,kafkaupdateOffset来记录Receiver接收到的偏移,这种方式保证了在Executor数据零丢失。日志


Driver端,按期进行checkpoint操做,出错时从Checkpoint的文件系统中把数据读取进来进行恢复,内部会从新构建StreamingContext(也就是构建SparkContext)并启动,恢复出元数据metedata,再次产生RDD,恢复的是上次的Job,而后再次提交到集群执行。orm

那么数据可能丢失的地方有哪些呢和相应的解决方式?

    在Receiver收到数据且经过Driver的调度Executor开始计算数据的时候,若是Driver忽然奔溃,则此时Executor会被杀死,那么Executor中的数据就会丢失(若是没有进行WAL的操做)

解决方式:此时就必须经过例如WAL的方式,让全部的数据都经过例如HDFS的方式首先进行安全性容错处理。此时若是Executor中的数据丢失的话,就能够经过WAL恢复回来。

这种方式的弊端是经过WAL的方式会极大额损伤SparkStreamingReceivers接收数据的性能。

数据重复读取的状况:

    在Receiver收到数据保存到HDFS等持久化引擎可是没有来得及进行updateOffsets(Kafka为例),此时Receiver崩溃后从新启动就会经过管理KafkaZookeeper中元数据再次重复读取数据,可是此时SparkStreaming认为是成功的,可是kafka认为是失败的(由于没有更新offsetZooKeeper),此时就会致使数据从新消费的状况。

    解决方式:以Receiver基于ZooKeeper的方式,当读取数据时去访问Kafka的元数据信息,在处理代码中例如foreachRDDtransform时,将信息写入到内存数据库中(memorySet),在计算时读取内存数据库信息,判断是否已处理过,若是以处理过则跳过计算。这些元数据信息能够保存到内存数据结构或者memsqlsqllite中。

 

若是经过Kafka做为数据来源的话,Kafka中有数据,而后Receiver接收的时候又会有数据副本,这个时候实际上是存储资源的浪费。

Spark1.3的时候为了不WAL的性能损失和实现Exactly Once而提供了Kafka Direct API,把Kafka做为文件存储系统。此时兼具备流的优点和文件系统的优点,至此Spark Streaming+Kafka就构建了完美的流处理世界(1,数据不须要拷贝副本;2,不须要WAL对性能的损耗;3Kafka使用ZeroCopyHDFS更高效)。全部的Executors经过Kafka API直接消息数据,直接管理Offset,因此也不会重复消费数据。

 

2,输出不重复

关于Spark Streaming数据输出屡次重写及其解决方案:

1,为何会有这个问题,由于Spark Streaming在计算的时候基于Spark Core天生会作如下事情致使Spark Streaming的结果(部分)重复输出。Task重试,慢任务推测,Stage重试,Job重试。

2,具体解决方案

设置spark.task.maxFailures次数为1,这样就不会有Task重试了。设置spark.speculation为关闭状态,就不会有慢任务推测了,由于慢任务推测很是消耗性能,因此关闭后能够显著提升Spark Streaming处理性能。

Spark Streaming On Kafka的话,Job失败后能够设置Kafka的参数auto.offset.resetlargest方式。

    最后再次强调能够经过transformforeachRDD基于业务逻辑代码进行逻辑控制来实现数据不重复消费和输出不重复。这两个方法相似于Spark Streaming的后门,能够作任意想象的控制操做。

备注:

一、DT大数据梦工厂微信公众号DT_Spark 二、IMF晚8点大数据实战YY直播频道号:68917580三、新浪微博: http://www.weibo.com/ilovepains

相关文章
相关标签/搜索