Spark Streaming的事务处理

本期内容数据库

  1. exactly once安全

  2. 输入不重复并发

  3. 输出不重复性能

 

exactly once :有且仅被执行一次。(很少,很多,一次恰好)atom

首先和你们聊下概念:spa

事务是恢复和并发控制的基本单位。日志

事务应该具备4个属性:原子性、一致性、隔离性、持久性。这四个属性一般称为ACID特性orm

原子性(atomicity)。一个事务是一个不可分割的工做单位,事务中包括的诸操做要么都作,要么都不作。htm

一致性(consistency)。事务必须是使数据库从一个一致性状态变到另外一个一致性状态。一致性与原子性是密切相关的。队列

隔离性(isolation)。一个事务的执行不能被其余事务干扰。即一个事务内部的操做及使用的数据对并发的其余事务是隔离的,并发执行的各个事务之间不能互相干扰。

持久性(durability)。持久性也称永久性(permanence),指一个事务一旦提交,它对数据库中数据的改变就应该是永久性的。接下来的其余操做或故障不该该对其有任何影响。

简单记忆法则(一持原隔)

如:银行转帐,A向B转帐500元,这个步骤能够分为A扣500元,B加500元 两部分。

若是 A减500元成功后,在B加500元的时候失败了,那么A减掉的500就不生效。也就是说。要么两个操做都成功,要么两个操做都失败。

 

先了解下SparkStreaming的数据流转流程

 

数据一致性的要求:

  1. 数据源可靠。数据源产生出来后,万一crash要能够恢复。数据存在kafka是很好的选择,既能够存储,又能够作高吞吐消息队列

  2. Receiver可靠。以数据来自Kafka为例。运行在Executor上的Receiver在接收到来自Kafka的数据时会向Kafka发送ACK确认收到信息并读取下一条信息,kafkaupdateOffset来记录Receiver接收到的偏移,这种方式保证了在Executor数据零丢失。

  3. Driver可靠。checkpoint可解决。

 

下面是几个数据非一致性的场景及解决方案

输入不丢失

 

数据丢失的场景:

  在Receiver收到数据且经过Driver的调度,Executor开始计算数据的时候,若是Driver忽然崩溃,此时Executor也会被Kill掉,那么Executor中的数据就会丢失,此时就必须经过WAL机制让全部的数据经过相似HDFS的方式进行安全性容错处理,从而解决Executor被Kill掉后致使数据丢失的问题。

  数据重复读取的场景:

  在Receiver收到数据且保存到了HDFS时,若是Receiver崩溃,且此时没有来得及更新ZooKeeper上的offsets,那么Receiver从新启动后就会从管理Kafka的ZooKeeper中再次读取元数据从而致使重复读取元数据;从Spark Streaming来看是成功的,可是Kafka认为是失败的(由于Receiver崩溃时没有及时更新offsets到ZooKeeper中)从新恢复时会从新消费一次,此时会致使数据从新消费的状况。

  Spark 1.3以前的版本在这个场景下其实有性能问题:

 

  1. 经过WAL方式保证数据不丢失,但弊端是经过WAL方式会极大的损伤Spark Streaming中的Receiver接收数据的性能(现网生产环境一般会Kafka Direct  Api直接处理)。

  2. 若是经过Kafka做为数据来源的话,Kafka中有数据,而后Receiver接收数据的时候又会有数据副本,这个时候实际上是存储资源的浪费。(重复读取数据解决办法,读取数据时能够将元数据信息放入内存数据库中,再次计算时检查元数据是否被计算过)。

  Spark从1.3版本开始,为了不WAL的性能损失和实现Exactly Once而提供了Kafka Direct Api,把Kafka做为文件存储系统。此时Kafka兼具备流的优点和文件系统的优点,至此,Spark Streaming+Kafka就构建了完美的流处理世界!

  数据不须要拷贝副本,不须要WAL性能损耗,不须要Receiver,而直接经过Kafka Direct Api直接消费数据,全部的Executors经过Kafka Api直接消费数据,直接管理offset,因此也不会重复消费数据;事务实现啦!

 

WAL的弊端:

WAL也不能彻底的解决数据丢失的问题,就像Oracle同样,日志文件的写,也是先写到内存中,而后根据必定的触发条件再将数据写到磁盘。若是尚未来的及写WAL日志,此时数据也会有不一致的状况(数据已经接收,可是尚未写到WAL的这部分数据是恢复不出来的。)。

 

输出不重复

  为何会有这个问题,由于SparkStreaming在计算的时候基于SparkCore,SparkCore天生会在下列场景中致使输出重复:

  1.Task、Stage甚至Job重试;

  2.慢任务推测;

      3.消息偏移量未及时更新;

  会致使数据的丢失。

 

  对应的解决方案:

  1.一个任务失败就是job 失败,设置spark.task.maxFailures次数为1;再也不重试。

  2.设置spark.speculation为关闭状态(由于慢任务推测其实很是消耗性能,因此关闭后能够显著的提升Spark Streaming处理性能)

  3.Spark streaming on kafka的话,假如job失败后能够设置kafka的auto.offset.reset为largest的方式会自动恢复job的执行。

 

 

  最后再次强调:

  能够经过transform和foreachRDD基于业务逻辑代码进行逻辑控制来实现数据不重复消费和输出不重复!这二个方法相似于spark的后门,能够作任意想象的控制操做!

相关文章
相关标签/搜索