从图上能够看到,Batch Interval的间隔是5s,也就是说每通过5s,SparkStreaming会将这5s内的信息封装成一个DStream,而后提交到Spark集群进行计算
这里模拟了一份黑名单,SparkStreaming监控服务器中指定端口,时间设定为每5秒处理一次数据。每当job触发时,用户输入的数据与黑名单中的数据进行左外链接,而后过滤
2.4Window窗口操做
window operation
普通的 每隔多长时间切割RDD
基于窗口的操做:每隔多长时间切割RDD,每隔多长时间计算一次,每次计算的量是多少
为何须要有窗口操做?
好比
别人要求可以实时看到此刻以前一段时间的数据状况,若是使用批处理的话,那么咱们只能固定一个整段时间而后对这个整段时间进行spark core的计算,可是别人的要求是每个时刻都须要有结果,那么就须要窗口操做?可是窗口操做确定会有不少的
重复计算,这里有一个优化的地方(
这个优化也不是必须的,
视具体状况而定,好比说咱们要查看最近30分钟最热门的头条,咱们在设计的时候不可能每隔30分钟计算一次,这里定义了滑动窗口时间是1分钟,然而计算量是30分钟内的数据,那么确定会有29分钟重复的数据计算);可是
优化的话就会有一个前提,必需要checkpoint
每次计算都是最近15s的数据,基于这个特性(微博热点:最近30分钟内最热门的头条)
问题一:batch interval 5s,窗口大小能够是8s么?
不行,有的batch就不能被窗口所包含,必须是batch interval的整数倍
问题二:滑动窗口时间 8s 能够么?
必须是batch interval的整数倍
优化:如何避免time3被重复计算(图中time3在两个window中都被计算了),能够没有,可是有的话,就须要这种优化
Batch Interval 1s ||
窗口大小 5s ||
滑动窗口 1s
思考:计算一个趋势的时候,须要基于滑动窗口的操做,是否必需要优化,避免重复计算?(未必)
For example:
1.查看微博中每小时的热门微博,每隔1分钟计算一次,至关于重复计算了59分钟的内容
2.商家想看前5分钟的销售额,每隔30秒看一次,也须要基于窗口的操做
2.5UpdateStateByKey
updateStateByKey
的使用须要checkpoint,隔几回记录一次到磁盘中
UpdateStateByKey的主要功能
一、Spark Streaming中为
每个Key维护一份state状态,这个
state类型能够是任意类型的的, 能够是一个自定义的对象,那么更新函数也能够是任意类型的。
二、经过更新函数对该key的状态不断更新,
对于每一个新的batch而言,Spark Streaming会在使用updateStateByKey的时候为已经存在的key进行state的状态更新(对于每一个新出现的key,会一样的执行state的更新函数操做)
三、若是要
不断的更新每一个key的state,就
涉及到了状态的保存和容错,这个时候
就须要开启checkpoint机制和功能 (
one.checkpoint(Durations.seconds(10)) //容错更好就须要牺牲性能,容错不须要过高,时间能够设置的长一些,(好比这里将checkpoint设置为10s,也就是每隔10s会将标记有checkpoint的RDD计算的结果持久化到磁盘,若是咱们设置的Batch Interval = 5s, 在第15s时触发job进行计算, 可是在第17s时down, 这时候只能恢复到10s的数据,那么10s至17s的数据就丢失了,具体设置多少,视具体状况而定))
UpdateStateByKey用处:统计广告点击流量,统计这一天的车流量...
案例:全面的广告点击分析
UpdateStateByKeyOperator
这里作了
checkpoint操做,
jsc.checkpoint("hdfs://ndoe1:8020/user/sscheckpoint");
node1建立一个Socket Server,指定8888端口,SparkStreaming与服务器这个端口创建通讯,那么用户的数据从这里流向SparkStreaming进行计算。
在这个案例中,
用以空格分割的单词来模拟用户点击的广告,每一个单词表明一个广告,
统计每个广告(单词)出现的次数(就是WordCount)
最后的
conunts.print() //output operator类型的算子
result: 利用SparkStreaming作到了微批处理,近似实时计算
查看hdfs,发现设置checkpoint会将SparkStreaming的处理结果进行了持久化
2.6reduceByKeyAndWindow
基于滑动窗口的热点搜索词实时统计--WindowOperator
1.未优化的
2.优化的必需要设置checkpoint的目录
如下是优化的过的reduceByKeyAndWindow
补充:
1.Spark master 8080端口 监控资源
Drive
4040端口 监控任务,能够看到有一个Streaming job(它里面有一个线程,是一直运行的,负责接收咱们的数据)
2.transform 和
foreachRDD 的区别?
transform Transformation类型算子,
transform很是厉害,
能够拿到每个DStream的rdd;这样就
能够利用每个rdd的全部算子来转换;
甚至在里面应用spark core,或者将
rdd转换成
DataFrame来使用
SparkSQL操做
foreachRDD Action类型算子,对于每一个RDD进行操做,何时会用?最后存结果的时候会用
3.
transform取出DStream中的RDD
使用transform将DStream中的RDD抽取出来,调用了RDD的Action类的算子(是能够执行的)是在Driver端执行的,若是不在Driver端执行,调用Action类的算子就不会触发一个job了
对RDD的操做才会发送到Executor端执行,transform是对DStream进行操做(transform中抽取RDD,对这个RDD执行collect类型的数据,在job Generator时执行的,生成了多个job,以jobSet的形式发送给jobSecheduler),这样的话就能够预警:对数据的预警,与标准进行比较,若是超过了这个标准就进行报警(一旦发现某个黑名单就当即进行报警,),总体的代码是在Driver端执行的,可是部分代码对RDD的操做是在Executor段执行的
SparkContext sc = userClickRDD.context();
Object obj = "能够来源于数据库,动态的更改广播变量"
sc.broadCast(obj)
2.6SparkStreaming--Driver HA
2.6.1Driver也有可能挂掉,如何实现它的高可用?
当一个Driver挂掉后,(回忆:当初的Master是由zookeeper进行托管),另外启动一个Driver,它就须要从上一个Driver中得到相关的信息(包括batch的进度,data的位置,job执行进度,
DStream的Graph(基于DStream的业务逻辑))
如何实现Driver的高可用-->基于HDFS上面的元数据(Driver的信息)进行恢复,
注意!不会从新new SparkContext,由于这样至关于又建立了一个全新的Driver
2.6.2Driver HA的代码套路
1.指定了去哪个目录下面寻找Driver的元数据信息
2.
提交Application到集群中执行的时候,必须使用cluster模式,同时必须指定一个参数 --supervise(当某一个Driver挂掉,新的Driver须要另外一个Driver中的信息来继续job的执行)
2.6.3监控HDFS上指定目录下文件数量的变化
示例代码
SparkStreamingOnHDFS
1.为了状态的保存和容错,开启了checkpoint机制,Driver HA
2.
ssc.textFileStream("hdfs://node1:8020/userhdfs/") //监控hdfs上/user/hdfs的变化
命令:hadoop fs -put wc /user/hdfs
2.6.4SparkStreaming 监控 HDFS 上文件数量的变化,并将变化写入到MySql中
示例代码
SparkStreamingOnHDFSToMySQL
1.为了状态的保存和容错,开启了checkpoint机制,Driver HA
2.
ssc.textFileStream("hdfs://node1:8020/userhdfs/") //监控hdfs上/user/hdfs的变化
3.Kafka
3.1Kafka定义
Apache Kafka是一个高吞吐的集发布与订阅与一体的分布式消息系统
流式处理的数据源是kafka,批处理的数据源是hive,也就是hdfs;
3.2消息队列常见的场景
1.系统之间的解耦合
-queue模型
-publish-subscribe模型
2.峰值压力缓冲,若是高峰期日志大量到SparkSreaming,那么会形成计算时间超过BatchInterval),能够在日志服务器和SparkStreaming中间加入Kafka,起到缓冲的做用
3.异步通讯
3.3Kafka的架构
消费者的
消费偏移量存储在
zookeeper中,生产者生产数据,消费者消费数据,kafka并不会生产数据,但kafka默认一周删除一次数据。
broker就是代理,在
kafka cluster这一层这里,其实里面是有不少个broker
topic就至关于
Queue,
Queue里面有
生产者消费者模型
3.4Kafka的消息存储和生产消费模型
topic:一个kafka集群中能够划分n多的topic,一个topic能够分红多个partition(这个是为了作并行的), 每一个partition内部消息强有序,其中的每一个消息都有一个序号叫offset,一个partition对应一个broker,一个broker能够管多个partition,这个partition能够很简单想象为一个文件,当数据发过来的时候它就往这个partition上面append,追加就行,kafka和不少消息系统不同,不少消息系统是消费完了我就把它删掉,而kafka是根据时间策略删除,而不是消费完就删除,在kafka里面没有一个消费完这么个概念,只有过时这样一个概念
生产者本身决定往哪一个partition写消息(轮循的负载均衡或者是基于hash的partition策略)
消费者能够订阅某一个topic,这个topic一旦有数据,会将数据推送给消费者
3.5kafka 组内queue消费模型 || 组间publish-subscribe消费模型
3.7为何Kafka的吞吐量高?
3.7.1 什么是Zero Copy?
零拷贝”是指计算机操做的过程当中,CPU不须要为数据在内存之间的拷贝消耗资源。而
它一般是指计算机在网络上发送文件时,不须要
将文件内容拷贝到用户空间(User Space)而直接在内核空间(Kernel Space)中
传输到网络的方式
3.7.2
kafka采用零拷贝Zero Copy的方式
从上图中能够清楚的看到,
Zero Copy的模式中,避免了数据在内存空间和用户空间之间的拷贝,从而提升了系统的总体性能。Linux中的sendfile()以及Java NIO中
的FileChannel.transferTo()方法都实现了零拷贝的功能,而在Netty中也经过在
FileRegion中包装了NIO的FileChannel.transferTo()方法实现了零拷贝
3.8搭建Kafka集群--leader的均衡机制
Kafka中leader的均衡机制
Kafka中一个topic有多个partition,如上图,kfk有0,1,2共三个partition,每一个partition都有对应的leader来进行管理,对于leader1来讲它来管理partition0,
当leader1挂掉以后,由于partition0配置了副本数(在broker0,broker2还存在partition0的副本),
那么此时会在broker0,broker2上选出一台当作leader继续管理partition0(好比说选取了broker2当作partition0的leader),
这时候
若是咱们配置了leader均衡机制,从新恢复了broker1,那么partition0的leader就会从broker2转移到broker1,减轻了broker2的读取压力,实现了负载均衡。固然若是不开启leader均衡机制的话,从新恢复broker1,那么partition0的leader仍就是broker2。
Kafka中leader的均衡机制在哪里配置?
在server.properties添加以下一句话
auto.leader.rebalance.enable=true
3.9 Kafka_code注意事项
注意一:
向kafka中写数据的时候咱们
必需要指定所配置kafka的全部brokers节点,而不能只配置一个节点,由于咱们写的话,是不知道这个topic最终存放在什么地方,因此必须指定全,
读取Kafka中的数据的时候是须要指定zk的节点,只须要指定一个节点就能够了;目前咱们使用的在代码中直接写上这些节点,之后所有要写到配置文件中
注意二:
kafka中存储的是键值对,即便咱们没有明确些出来key,
获取的时候也是须要利用tuple的方式获取值的;而对于放到一个kafka中的数据,
这个数据到底存放到那个partition中呢?这个就须要使用hashPartition方式或者普通的轮询方式存放;对于没有明确指定key的发往kafka的数据,使用的就是轮询方式;
4.SparkStreaming + Kafka 两种模式--Receive模式 || Direct模式
Receive模式--SparkStreaming + Kafka 总体架构
注意!每一步都是阻塞的,上一步完成以后才能进行下一步
流程:
1.接收数据(
SparkStreaming做为
消费者,若是订阅了一个topic,那么topic一有数据就会主动推送给SparkSreaming)
2.Executor将接收来的数据备份到其余Executor中(Executor中执行的job做为一个receiver,里面的task一直在接收kafka推送来的数据,而后将接收来的数据进行持久化,
默认的持久化级别是
MEMORY_DISK_SER_2)
3.Executor备份完成以后,向
Driver中的
ReceiverTracker汇报数据存储在哪一些的
Executor中(
Driver{ReceiverTracker,DAGScheduler,TaskScheduler})
4.在zookeeper中更新消费偏移量
5.Driver负责分发
task到数据所在的
Executor中执行(达到移动计算,而不是移动数据)
注意!
1.在SparkStreaming中Driver一旦挂掉,它下面的Executor也会挂掉
若是在第四步完成后,Driver挂掉了,会有什么问题?
其实数据并无被处理,数据就丢了,所以kafka的事务机制并不完善
所以对于如上这种状况,提供了一个解决方案,就是WAL机制(
WriteAheadLog--预写机制)
可是WAL机制有什么问题?(每一次接收来的数据,都要往HDFS上写一份,性能会有所降低)
代码示例
SparkStreamingOnKafkaReceiver
SparkStreamingDataManuallyProducerForKafka
须要启动HDFS
Direct模式
SparkStreaming和Kafka直接链接,SparkStreaming去Kafka去pull数据,这个消费偏移量由SparkStreaming本身来维护(实际上经过checkpoint来管理的,checkpoint和job是异步的,总的来时SparkStreaming的事务机制并非很完善),避免了数据的丢失(相对而言,不是绝对的)
并行度:
1.
linesDStream里面封装的是RDD,RDD里面有partition,RDD里面的partition与这个topic的partition是一一对应的
2.从kafka中读来的数据,封装到一个DStream中,能够
对这个DStream重分区,lines.repartition(10),增长partition的数量,提升并行度。
并行度:
batch->rdd->DStream
batchInterval 5s
blockInterval = 200ms
batch = 25block
将一个blockInterval设置的小一些,有更多的block,对应更多的split,也就有更多的partition,从而提升并行度
官方建议:blockInterval不要低于50ms,不然
batchInterval/
blockInterval
获得的block过多,partition就过多,启动多个线程并行计算,影响执行job的性能
Receive模式 || Direct模式
最大的不一样:消费偏移量谁来管理