本期内容:数据库
一、Exactly once缓存
二、输出不重复安全
事务概念:好比银行转帐,数据必定会被处理,且只被处理一次,可以输出,且只能输出一次,A转帐给B只输出一次,B接收且只接收一次,双方要么同时成功或者同时失败!微信
Spark Core是怎么处理一个Job的?具体过程以下图所示:架构
流程说明:InputStream不断的输入数据,Executor中的Reciver不停的接收数据,为了保证接收到的数据不丢失,Receiver把接收到数据经过BlockManager写入内存和磁盘,或者经过WAL机制记录日志,而后把元数据metedata信息汇报给Driver,Driver收到数据后,为了数据安全,在运行前会把元数据写入磁盘或者HDFS上(checkpointer),最后真正执行Job是在Executor,固然能够有多个Executor;性能
可能出现的问题:spa
问题:Receiver中接收到的数据达到必定的阈值后,才会触发WAL,若是数据收到一半,Receiver挂掉了,也会存在数据丢失的可能?日志
解决:经过Kafka发送数据给Receiver,若是Receiver挂掉,数据会缓存在Kafka中,待Receiver恢复后,会从新接收到Kafka中的数据;orm
数据丢失及其具体的解决方式:blog
在Receiver收到数据且经过Driver的调度Executor开始计算数据的时候若是Driver忽然崩溃,则此时Executor会被Kill掉,那么Executor中的数据就会丢失,此时就必须经过例如WAL的方式让全部的数据都经过例如HDFS的方式首先进行安全性容错处理,此时若是Executor中的数据丢失的话就能够经过WAL恢复回来;
Exactly Once的事务处理:
1,数据零丢失:必须有可靠的数据来源和可靠的Receiver,且整个应用程序的metadata必须进行checkpoint,且经过WAL来保证数据安全;
2,Spark Streaming 1.3的时候为了不WAL的性能损失和实现Exactly Once而提供了Kafka Direct API,把Kafka做为文件存储系统!!!此时兼具备流的优点和文件系统的优点,至此,Spark Streaming+Kafka就构建了完美的流处理世界!!!全部的Executors经过Kafka API直接消费数据,直接管理Offset,因此也不会重复消费数据;实务实现啦!!!此时数据在Kafka中因此必定会被处理,且只被处理一次;
为何说Spark Streaming+Kafka的方式是完美的呢?
1.数据不须要拷贝副本;
2.不须要进行WAL,避免了没必要要的性能损耗;
3.Kafka比Hdfs高效不少,由于Kafka内部会进行memory-copy;
为了解决第一点中出现的各种问题,演变出了以下的架构图:
数据重复读取的状况:
在Receiver收到数据且保存到了HDFS等持久化引擎可是没有来得及进行updateOffsets,此时Receiver崩溃后从新启动就会经过管理Kafka的ZooKeeper中元数据再次重复读取数据,可是此时SparkStreaming认为是成功的,可是Kafka认为是失败的(由于没有更新offset到ZooKeeper中),此时就会致使数据从新消费的状况。
如今Zookeeper和Kafka都很成熟了,因此数据重复读取的状况出现的几率较小;
问题:数据重复消费的问题怎么解决?
答:能够在程序中处理,程序读取到元数据后,放入内存数据库中,每次处理的时候读取内存数据库,处理事后打个标记,下次再处理的时候经过标记判断,已处理的就再也不处理
第一点中的“Exactly Once的事务处理”第二小点,Spark Streaming 1.3的时候为了不WAL的性能损失和实现Exactly Once而提供了Kafka Direct API,自然的解决了数据重复消费的问题;
性能损失:
1,经过WAL方式会极大的损伤Spark Streaming中Receivers接受数据的性能;(实际生产环境中,用Receiver的状况并很少,更多的是直接基于Kafka API进行处理)
2,若是经过Kafka的做为数据来源的话,Kafka中有数据,而后Receiver接受的时候又会有数据副本,这个时候实际上是存储资源的浪费;
关于Spark Streaming数据输出屡次重写及其解决方案:
1,为何会有这个问题,由于Spark Streaming在计算的时候基于Spark Core,Spark Core天生会作如下事情致使Spark Streaming的结果(部分)重复输出:
Task重试;
慢任务推测
Stage重复;
Job重试;
2,具体解决方案:
设置spark.task.maxFailures次数为1;最大容许失败的次数,设为1就没有task、stage、job等的重试;
设置spark.speculation为关闭状态(由于慢任务推测其实很是消耗性能,因此关闭后能够显著提升Spark Streaming处理性能)
Spark Streaming on Kafka的话,Job失败会致使任务失败,Job失败后能够设置auto.offset.reset为“largest”的方式;
最后再次强调能够经过transform和foreachRDD基于业务逻辑代码进行逻辑控制来实现数据不重复消费和输出不重复!这两个方式相似于Spark Streaming的后门,能够作任意想象的控制操做!
特别感谢王家林老师的独具一格的讲解:
王家林老师名片:
中国Spark第一人
新浪微博:http://weibo.com/ilovepains
微信公众号:DT_Spark
博客:http://blog.sina.com.cn/ilovepains
QQ:1740415547
YY课堂:天天20:00现场授课频道68917580