Spark Streaming 是 Spark Core API 的扩展, 它支持弹性的, 高吞吐的, 容错的实时数据流的处理. 数据能够经过多种数据源获取, 例如 Kafka, Flume, Kinesis 以及 TCP sockets, 也能够经过例如 map
, reduce
, join
, window
等的高级函数组成的复杂算法处理. 最终, 处理后的数据能够输出到文件系统, 数据库以及实时仪表盘中. 事实上, 你还能够在 data streams(数据流)上使用 机器学习 以及 图计算算法.html
运行原理算法
初始化注意点:数据库
stop()
的可选参数,名叫 stopSparkContext
为 false.sparkContext.textFileStream(dir)
sparkContext.socketTextStream()
KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
Batch durationapache
对于源源不断的数据,Spark Streaming是经过切分的方式,先将连续的数据流进行离散化处理。数据流每被切分一次,对应生成一个RDD,每一个RDD都包含了一个时间间隔内所获取到的全部数据。数组
批处理时间间隔的设置会伴随Spark Streaming应用程序的整个生命周期,没法在程序运行期间动态修改网络
new StreamingContext(sparkConf,Seconds(1))
转换操做app
dstream.transform(fun)
windowDuration/batchDuration
操做规约机器学习
普通规约是每次把window里面每一个RDD都计算一遍,增量规约是每次只计算新进入window的数据,而后减去离开window的数据,获得的就是window数据的大小,在使用上,增量规约须要提供一个规约函数的逆函数,好比+
对应的逆函数为-
socket
普通规约:val wordCounts=words.map(x=>(x,1)).reduceByKeyAndWindow(_+_,Seconds(5s),seconds(1))
ide
增量规约:val wordCounts=words.map(x=>(x+1)).reduceByKeyAndWindow(_+_,_-_,Seconds(5s),seconds(1))
// 1. con't not create before foreachPartition function(cont't create in driver) // 2. use foreachPartition instead of foreach // 3. use connect pool instead of create connect every time dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } }
MEMORY_ONLY_SER
MEMORY_AND_DISK_SER_2
MEMORY_ONLY
持久化sparkStreaming 周期性的把应用数据存储到HDFS等可靠的存储系统中能够供回复时使用的机制叫作检查点机制,
做用:
数据类型:
Metadata(元数据): streaming计算逻辑,主要来恢复driver。
Configuration
:配置文件,用于建立该streaming application的全部配置
DStream operations
:对DStream进行转换的操做集合
Incomplete batches
:未完成batchs,那些提交了job在队列等待还没有完成的job信息。
Data checkpointing
: 已经生成的RDD但还未保存到HDFS或者会影响后续RDD的生成。
注意点
Checkpoint类
checkpoint的形式是将类CheckPoint的实例序列化后写入外部内存
缺点
SparkStreaming 的checkpoint机制是对CheckPoint对象进行序列化后的数据进行存储,那么SparkStreaming Application从新编译后,再去反序列化checkpoint数据就会失败,这个时候必须新建StreamingContext
针对这种状况,在结合SparkStreaming+kafka的应用中,须要自行维护消费offsets,这样即便从新编译了application,仍是能够从须要的offsets来消费数据。对于其余状况须要结合实际的需求进行处理。
使用
checkpoint的时间间隔正常状况下应该是sliding interval的5-10倍,可经过dstream.checkpoint(checkpointInterval)
配置每一个流的interval。
若是想要application能从driver失败中恢复,则application须要知足
def createStreamingContext()={ ... val sparkConf=new SparkConf().setAppName("xxx") val ssc=new StreamingContext(sparkConf,Seconds(1)) ssc.checkpoint(checkpointDir) } ... val ssc=StreamingContext.getOrCreate(checkpointDir,createSreamingContext _)
Accumulators, Broadcast Variables, and Checkpoints
在sparkStreaming中累加器和广播变量不可以在checkpoints中恢复,广播变量是在driver上执行的,可是当driver重启后并无执行广播,当slaves调用广播变量时报Exception: (Exception("Broadcast variable '0' not loaded!",)
能够为累加器和广播变量建立延迟实例化的单例实例,以便在驱动程序从新启动失败后从新实例化它们
问题参考:https://issues.apache.org/jira/browse/SPARK-5206
系统的容错主要从三个方面,接收数据,数据处理和输出数据,在sparkStreaming中,接收数据和数据来源有关系,处理数据能够保证exactly once,输出数据能够保证at least once。
sparStreaming并不能彻底的像RDD那样实现lineage,由于其有的数据源是经过网络传输的,不可以重复获取。
接收数据根据数据源不一样容错级别不一样
with file
:经过hdfs等文件系统中读取数据时能够保证exactly-oncewith reciever-base-source
:
reliable reciever
:当reciever接收失败时不给数据源答复接收成功,在reciever重启后继续接收unreliable reciever
:接收数据后不给数据源返回接收结果,则数据源也不会再次下发数据sparkStreaming经过write-ahead-logs 提供了at least once的保证。在spark1.3版本以后,针对kafka数据源,能够作到exactly once ,更多内容
相似于foreachRdd操做,能够保证at least once,若是输出时想实现exactly once可经过如下两种方式:
Idempotent updates
:幂等更新,屡次尝试将数据写入同一个文件Transactional updates
:事物更新,实现方式:经过batch time和the index of rdd实现RDD的惟一标识,经过惟一标识去更新外部系统,即若是已经存在则跳过更新,若是不存在则更新。eg:dstream.foreachRDD { (rdd, time) => rdd.foreachPartition { partitionIterator => val partitionId = TaskContext.get.partitionId() val uniqueId = generateUniqueId(time.milliseconds, partitionId) // use this uniqueId to transactionally commit the data in partitionIterator } }
sparkStreaming调优主要从两方面进行:开源节流——提升处理速度和减小输入数据。
详情参考:http://spark.apache.org/docs/latest/tuning.html#level-of-parallelism