import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1)) // 能够经过 ssc.sparkContext 来访问 SparkContext // 或者经过已经存在的 SparkContext 来建立 StreamingContext import org.apache.spark.streaming._ val sc = ... // existing SparkContext val ssc = new StreamingContext(sc, Seconds(1))
初始化完Context以后:html
1) 定义消息输入源来建立DStreamsjava
2) 定义DStreams的转化操做和输出操做sql
3) 经过streamingContext.start()来启动消息采集和处理数据库
4) 等待程序终止,能够经过streamingContext.awaitTermination()来设置apache
5) 经过stramingContext.stop()来手动终止处理程序bootstrap
StreamingContext和SparkContext什么关系?windows
import org.apache.spark.streaming._ val sc = ... // existing SparkContext val ssc = new StreamingContext(sc, Seconds(1))
注意:api
StreamingContext一旦启动,对DStreams的操做就不能修改了缓存
在同一时间一个JVM中只有一个StreamingContext能够启动服务器
stop()方法将同时中止SparkContext,能够传入参数stopSparkContext用于只中止StreamingContext
在Spark1.4版本后,如何优雅的中止SparkStreaming而不丢失数据,经过设置 sparkConf.set("spark.streaming.stopGracefullyOnShutdown","true") 便可。在StreamingContext的start方法中已经注册了Hook方法
Discretized Stream是Spark Streaming的基础抽象,表明持续性的数据流和通过各类Spark原语操做后的结果数据流。在内部实现上,DStream是一系列连续的RDD来表示。每一个RDD含有一段时间间隔内的数据,以下图:
对数据的操做也是按照RDD为单位来进行的
计算过程由Spark engine来完成
Spark Streaming原生支持一些不一样的数据源。一些“核心”数据源已经被打包到Spark Streaming的Maven工件中,而其余的一些则能够经过spark- streaming-kafka等附加工件获取。每一个接收器都以Spark执行器程序中一个长期运行的任务的形式运行,所以会占据分配给应用的CPU核心。此外,咱们还须要有可用的CPU核心来处理数据。这意味着若是要运行多个接收器,就必须至少有和接收器数目相同的核心数,还要加上用来完成计算所须要的核心数。例如,若是咱们想要在流计算应用中运行10个接收器,那么至少须要为应用分配11个CPU核心。因此若是在本地模式运行,不要使用local或者local[1]
Socket数据流前面的例子已经看到过
文件数据流:可以读取全部HDFS API兼容的文件系统文件,经过fileStream方法进行读取
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
Spark Streaming将会监控dataDirectory目录并不断处理移动进来的文件,记住目前不支持嵌套目录
1) 文件须要有相同的数据格式
2) 文件进入dataDirectory的方式须要经过移动或者重命名来实现
3) 一旦文件移动进目录,则不能再修改,即使修改了也不会读取新数据
若是文件比较简单,则可使用streamingContext.textFileStream(dataDirectory)方法来读取文件。文件流不须要接收器,不须要单独分配CPU核
经过继承Receiver,并实现onStart、onStop方法来自定义数据源采集
class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging { def onStart() { // Start the thread that receives data over a connection new Thread("Socket Receiver") { override def run() { receive() } }.start() } def onStop() { // There is nothing much to do as the thread calling receive() // is designed to stop by itself if isStopped() returns false } /** Create a socket connection and receive data until receiver is stopped */ private def receive() { var socket: Socket = null var userInput: String = null try { // Connect to host:port socket = new Socket(host, port) // Until stopped or connection broken continue reading val reader = new BufferedReader( new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8)) userInput = reader.readLine() while(!isStopped && userInput != null) { store(userInput) userInput = reader.readLine() } reader.close() socket.close() // Restart in an attempt to connect again when server is active again restart("Trying to connect again") } catch { case e: java.net.ConnectException => // restart if could not connect to server restart("Error connecting to " + host + ":" + port, e) case t: Throwable => // restart if there is any other error restart("Error receiving data", t) } } }
能够经过 streamingContext.receiverStream(<instance of custom receiver>)来使用自定义的数据采集源
// Assuming ssc is the StreamingContext val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port)) val words = lines.flatMap(_.split(" ")) ...
测试过程当中,能够经过使用streamingContext.queueStream(queueOfRDDs)来建立DStream,每个推送到这个队列中的RDD,都会做为一个 DStream处理
import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable object QueueRdd { def main(args: Array[String]) { val conf = new SparkConf().setMaster("local[2]").setAppName("QueueRdd") val ssc = new StreamingContext(conf, Seconds(1)) // Create the queue through which RDDs can be pushed to // a QueueInputDStream //建立 RDD 队列 val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]() // Create the QueueInputDStream and use it do some processing // 建立 QueueInputDStream val inputStream = ssc.queueStream(rddQueue) //处理队列中的 RDD 数据 val mappedStream = inputStream.map(x => (x % 10, 1)) val reducedStream = mappedStream.reduceByKey(_ + _) //打印结果 reducedStream.print() //启动计算 ssc.start() // Create and push some RDDs into for (i <- 1 to 30) { rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10) Thread.sleep(2000) //经过程序中止 StreamingContext 的运行 //ssc.stop() } } }
除核心数据源外,还能够用附加数据源接收器来从一些知名数据获取系统中接收的数据,这些接收器都做为Spark Streaming的组件进行独立打包了。 它们仍然是Spark的一部分,不过你须要在构建文件中添加额外的包才能使用它们。现有的接收器包括Twitter、Apache Kafka、Amazon Kinesis、Apache Flume,以及ZeroMQ。能够经过添加与Spark版本匹配的Maven工件spark-streaming-[projectname]_2.10来引入这些附加接收器
在工程中须要引入Maven工件spark-streaming-kafka_2.10来使用它。包内提供的KafkaUtils对象能够在StreamingContext和JavaStreamingContext 中以你的Kafka消息建立出DStream。因为KafkaUtils能够订阅多个主题,所以它建立出的DStream由成对的主题和消息组成。要建立出一个流数据,须要使用 StreamingContext实例、一个由逗号隔开的ZooKeeper主机列表字符串、消费者组的名字(惟一名字),以及一个从主题到针对这个主题的接收器线程数的映射表来调用createStream()方法
import org.apache.spark.streaming.kafka._ ... // 建立一个从主题到接收器线程数的映射表 val topics = List(("pandas", 1), ("logs", 1)).toMap val topicLines = KafkaUtils.createStream(ssc, zkQuorum, group, topics) topicLines.map(_._2)
演示SparkStreaming如何从Kafka读取消息,若是经过链接池方法把消息处理完成后再写回Kafka
kafka Connection Pool程序:
mport java.util.Properties import org.apache.commons.pool2.impl.DefaultPooledObject import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject} import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} case class KafkaProducerProxy(brokerList: String, producerConfig: Properties = new Properties, defaultTopic: Option[String] = None, producer: Option[KafkaProducer[String, String]] = None) { type Key = String type Val = String require(brokerList == null || !brokerList.isEmpty, "Must set broker list") private val p = producer getOrElse { var props:Properties= new Properties(); props.put("bootstrap.servers", brokerList); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); new KafkaProducer[String,String](props) } private def toMessage(value: Val, key: Option[Key] = None, topic: Option[String] = None): ProducerRecord[Key, Val] = { val t = topic.getOrElse(defaultTopic.getOrElse(throw new IllegalArgumentException("Must provide topic or default topic"))) require(!t.isEmpty, "Topic must not be empty") key match { case Some(k) => new ProducerRecord(t, k, value) case _ => new ProducerRecord(t, value) } } def send(key: Key, value: Val, topic: Option[String] = None) { p.send(toMessage(value, Option(key), topic)) } def send(value: Val, topic: Option[String]) { send(null, value, topic) } def send(value: Val, topic: String) { send(null, value, Option(topic)) } def send(value: Val) { send(null, value, None) } def shutdown(): Unit = p.close() } abstract class KafkaProducerFactory(brokerList: String, config: Properties, topic: Option[String] = None) extends Serializable { def newInstance(): KafkaProducerProxy } class BaseKafkaProducerFactory(brokerList: String, config: Properties = new Properties, defaultTopic: Option[String] = None) extends KafkaProducerFactory(brokerList, config, defaultTopic) { override def newInstance() = new KafkaProducerProxy(brokerList, config, defaultTopic) } class PooledKafkaProducerAppFactory(val factory: KafkaProducerFactory) extends BasePooledObjectFactory[KafkaProducerProxy] with Serializable { override def create(): KafkaProducerProxy = factory.newInstance() override def wrap(obj: KafkaProducerProxy): PooledObject[KafkaProducerProxy] = new DefaultPooledObject(obj) override def destroyObject(p: PooledObject[KafkaProducerProxy]): Unit = { p.getObject.shutdown() super.destroyObject(p) } }
KafkaStreaming main:
import org.apache.commons.pool2.impl.{GenericObjectPool, GenericObjectPoolConfig} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.api.java.function.VoidFunction import org.apache.spark.rdd.RDD import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext} object createKafkaProducerPool{ def apply(brokerList: String, topic: String): GenericObjectPool[KafkaProducerProxy] = { val producerFactory = new BaseKafkaProducerFactory(brokerList, defaultTopic = Option(topic)) val pooledProducerFactory = new PooledKafkaProducerAppFactory(producerFactory) val poolConfig = { val c = new GenericObjectPoolConfig val maxNumProducers = 10 c.setMaxTotal(maxNumProducers) c.setMaxIdle(maxNumProducers) c } new GenericObjectPool[KafkaProducerProxy](pooledProducerFactory, poolConfig) } } object KafkaStreaming{ def main(args: Array[String]) { val conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(1)) //建立 topic val brobrokers = "172.16.148.150:9092,172.16.148.151:9092,172.16.148.152:9092" val sourcetopic="source"; val targettopic="target"; //建立消费者组 var group="con-consumer-group" //消费者配置 val kafkaParam = Map( "bootstrap.servers" -> brobrokers,//用于初始化连接到集群的地址 "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], //用于标识这个消费者属于哪一个消费团体 "group.id" -> group, / //若是没有初始化偏移量或者当前的偏移量不存在任何服务器上,可使用这个配置属性 //可使用这个配置,latest 自动重置偏移量为最新的偏移量 "auto.offset.reset" -> "latest", //若是是 true,则这个消费者的偏移量会在后台自动提交 "enable.auto.commit" -> (false: java.lang.Boolean) ); //ssc.sparkContext.broadcast(pool) //建立 DStream,返回接收到的输入数据 var stream=KafkaUtils.createDirectStream[String,String](ssc, LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String ](Array(sourcetopic),kafkaParam)) //每个 stream 都是一个 ConsumerRecord stream.map(s =>("id:" + s.key(),">>>>:"+s.value())).foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { // Get a producer from the shared pool val pool = createKafkaProducerPool(brobrokers, targettopic) val p = pool.borrowObject() partitionOfRecords.foreach {message => System.out.println(message._2); p.send(message._2,Option(targettopic))} // Returning the producer to the pool also shuts it down pool.returnObject(p) }) }) ssc.start() ssc.awaitTermination() } }
Spark对于Kafka的链接主要有两种方式,一种是DirectKafkaInputDStream,另一种是KafkaInputDStream。DirectKafkaInputDStream只在driver端接收数据,因此继承了InputDStream,是没有receivers的
主要经过KafkaUtils#createDirectStream以及KafkaUtils#createStream这两个API来建立,除了要传入的参数不一样外,接收kafka数据的节点、拉取数据的时机也彻底不一样
KafkaUtils#createStream【Receiver-based】
这种方法使用一个Receiver来接收数据。在该Receiver的实现中使用了Kafka high-level consumer API。Receiver从kafka接收的数据将被存储到Spark executor中,随后启动的job将处理这些数据
在默认配置下,该方法失败后会丢失数据(保存在executor内存里的数据在application失败后就没了),若要保证数据不丢失,须要启用WAL(即预写日志至HDFS、S3等),这样再失败后能够从日志文件中恢复数据
在该函数中,会新建一个KafkaInputDStream对象,KafkaInputDStream继承于ReceiverInputDStream。KafkaInputDStream实现了getReceiver方法,返回接收器的实例:
def getReceiver(): Receiver[(K, V)] = { if (!useReliableReceiver) { //< 不启用 WAL new KafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) } else { //< 启用 WAL new ReliableKafkaReceiver[K, V, U, T](kafkaParams, topics, storageLevel) } }
根据是否启用 WAL,receiver分为KafkaReceiver和ReliableKafkaReceiver
下图描述了KafkaReceiver接收数据的具体流程:
须要注意的点:
Kafka Topic的partitions与RDD的partitions没有直接关系,不能一一对应。若是增长topic的partition个数的话仅仅会增长单个Receiver接收数据的线程数。事实上,使用这种方法只会在一个executor上启用一个Receiver,该Receiver包含一个线程池,线程池的线程个数与全部topics的partitions个数总和一致,每条线程接收一个topic的一个partition的数据。而并不会增长处理数据时的并行度。
对于一个 topic,可使用多个groupid相同的input DStream来使用多个Receivers来增长并行度,而后union他们;对于多个topics,除了能够用上个办法增长并行度外,还能够对不一样的topic使用不一样的input DStream而后union他们来增长并行度
若是启用了WAL,为能将接收到的数据将以log的方式在指定的存储系统备份一份,须要指定输入数据的存储等级为StorageLevel.MEMORY_AND_DISK_SER或StorageLevel.MEMORY_AND_DISK_SER_2
KafkaUtils#createDirectStream【WithOut Receiver】
自Spark-1.3.0起,提供了不须要Receiver的方法。替代了使用receivers来接收数据,该方法按期查询每一个topic+partition的lastest offset,并据此决定每一个batch要接收的offsets范围
KafkaUtils#createDirectStream调用中,会新建DirectKafkaInputDStream,DirectKafkaInputDStream#compute(validTime: Time)会从kafka拉取数据并生成RDD,流程以下:
如上图所示,该函数主要作了如下三个事情:
1. 肯定要接收的partitions的offsetRange,以做为第2步建立的RDD的数据来源
2. 建立RDD并执行count操做,使RDD真实具备数据
3. 以streamId、数据条数,offsetRanges信息初始化inputInfo并添加到JobScheduler中
进一步看KafkaRDD的getPartitions实现:
override def getPartitions: Array[Partition] = { offsetRanges.zipWithIndex.map {case (o, i) => val (host, port) = leaders(TopicAndPartition(o.topic, o.partition)) new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port) }.toArray }
从上面的代码能够很明显看到,KafkaRDD的partition数据与Kafka topic的某个partition的o.fromOffset至o.untilOffset数据是相对应的,也就是说KafkaRDD的partition与Kafka partition是一一对应的
该方式相比使用Receiver的方式有如下好处:
简化并行:再也不须要建立多个kafka input DStream而后再union这些input DStream。使用directStream,Spark Streaming会建立与Kafka partitions相同数量的paritions的RDD,RDD的partition与Kafka的partition一一对应,这样更易于理解及调优
高效:在方式一中要保证数据零丢失须要启用WAL(预写日志),这会占用更多空间。而在方式二中,能够直接从Kafka指定的topic的指定offsets处恢复数据,不须要使用WAL
刚好一次语义保证:基于Receiver方式使用了Kafka的high level API来在Zookeeper中存储已消费的offsets。这在某些状况下会致使一些数据被消费两次,好比streaming app在处理某个batch内已接受到的数据的过程当中挂掉,可是数据已经处理了一部分,但这种状况下没法将已处理数据的offsets更新到 Zookeeper中,下次重启时,这批数据将再次被消费且处理。基于direct的方式,使用kafka的简单api,Spark Streaming本身就负责追踪消费的offset,并保存在 checkpoint中。Spark本身必定是同步的,所以能够保证数据是消费一次且仅消费一次。这种方式中,只要将output操做和保存offsets操做封装成一个原子操做就能避免失败后的重复消费和处理,从而达到刚好一次的语义(Exactly-once)
经过以上分析,咱们能够对这两种方式的区别作一个总结:
1. createStream会使用Receiver;而createDirectStream不会
2. createStream使用的Receiver会分发到某个executor上去启动并接受数据;而createDirectStream直接在driver上接收数据
3. createStream 使用 Receiver 源源不断的接收数据并把数据交给ReceiverSupervisor 处理最终存储为 blocks 做为 RDD 的输入,从 kafka拉取数据与计算消费数据相互独立;而 createDirectStream 会在每一个 batch拉取数据并就地消费,到下个 batch 再次拉取消费,周而复始,从 kafka拉取数据与计算消费数据是连续的,没有独立开
4. createStream中建立的KafkaInputDStream每一个batch所对应的RDD的partition不与Kafka partition一一对应;而createDirectStream中建立的 DirectKafkaInputDStream每一个batch所对应的RDD的partition与Kafka partition一一对应
Spark提供两个不一样的接收器来使用Apache Flume。 两个接收器简介以下
推式接收器该接收器以Avro数据池的方式工做,由Flume向其中推数据
拉式接收器该接收器能够从自定义的中间数据池中拉数据,而其余进程可使用Flume把数据推动该中间数据池
两种方式都须要从新配置Flume,并在某个节点配置的端口上运行接收器(不是已有的 Spark 或者 Flume 使用的端口)。要使用其中任何一种方法,都须要在工程中引入Maven工件spark-streaming-flume_2.10
推式接收器的方法设置起来很容易,可是它不使用事务来接收数据。在这种方式中,接收器以Avro数据池的方式工做,咱们须要配置Flume来把数据发到 Avro数据池。咱们提供的FlumeUtils对象会把接收器配置在一个特定的工做节点的主机名及端口号上。这些设置必须和Flume配置相匹配
虽然这种方式很简洁,但缺点是没有事务支持。这会增长运行接收器的工做节点发生错误时丢失少许数据的概率。不只如此,若是运行接收器的工做节点发生故障,系统会尝试从另外一个位置启动接收器,这时须要从新配置Flume才能将数据发给新的工做节点。这样配置会比较麻烦
较新的方式是拉式接收器(在 Spark 1.1 中引入),它设置了一个专用的Flume数据池供Spark Streaming读取,并让接收器主动从数据池中拉取数据。这种方式的优势在于弹性较好,Spark Streaming经过事务从数据池中读取并复制数据。在收到事务完成的通知前,这些数据还保留在数据池中
咱们须要先把自定义数据池配置为Flume的第三方插件。安装插件的最新方法请参考Flume文档的相关部分(https://flume.apache.org/FlumeUserGuide.html#installing-third-party-plugins)。 因为插件是用Scala写的,所以须要把插件自己以及Scala库都添加到Flume插件中
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-flume-sink_2.11</artifactId> <version>1.2.0</version> </dependency> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.11</version> </dependency>
当把自定义Flume数据池添加到一个节点上以后,就须要配置Flume来把数据推送到这个数据池中
a1.sinks = spark a1.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink a1.sinks.spark.hostname = receiver-hostname a1.sinks.spark.port = port-used-for-sync-not-spark-port a1.sinks.spark.channel = memoryChannel
等到数据已经在数据池中缓存起来,就能够调用FlumeUtils来读取数据了
val events = FlumeUtils.createPollingStream(ssc,receiverHostname,receiverPort)
DStream上的原语与RDD的相似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操做中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各类Window相关的原语
DStream的转化操做能够分为无状态(stateless)和有状态(stateful)两种
在无状态转化操做中,每一个批次的处理不依赖于以前批次的数据。常见的RDD转化操做,例如 map()、filter()、reduceByKey() 等,都是无状态转化操做
相对地,有状态转化操做须要使用以前批次的数据或者是中间结果来计算当前批次的数据。有状态转化操做包括基于滑动窗口的转化操做和追踪状态变化的转化操做
无状态转化操做就是把简单的RDD转化操做应用到每一个批次上,也就是转化DStream中的每个RDD。部分无状态转化操做列在了下表中。注意,针对键值对的DStream转化操做(好比reduceByKey())要添加import StreamingContext._才能在Scala中使用
Transformation |
Meaning |
map(func) |
将源DStream中的每一个元素经过一个函数func从而获得新的DStreams |
flatMap(func) |
和map相似,可是每一个输入的项能够被映射为0或更多项 |
filter(func) |
选择源DStream中函数func判为true的记录做为新DStreams |
repartition(numPartitions) |
经过建立更多或者更少的partition来改变此DStream的并行级别 |
union(otherStream) |
联合源DStreams和其余DStreams来获得新DStream |
count() |
统计源DStreams中每一个RDD所含元素的个数获得单元素RDD的新DStreams |
reduce(func) |
经过函数func(两个参数一个输出)来整合源DStreams中每一个RDD元素获得单元素RDD的DStreams。这个函数须要关联从而能够被并行计算 |
countByValue() |
对于DStreams中元素类型为K调用此函数,获得包含(K,Long)对的新DStream,其中Long值代表相应的K在源DStream中每一个RDD出现的频率 |
reduceByKey(func, [numTasks]) |
对(K,V)对的DStream调用此函数,返回一样(K,V)对的新DStream,可是新DStream中的对应V为使用reduce函数整合而来。Note:默认状况下,这个操做使用Spark默认数量的并行任务(本地模式为2,集群模式中的数量取决于配置参数spark.default.parallelism)。也能够传入可选的参数numTaska来设置不一样数量的任务 |
join(otherStream, [numTasks]) |
两DStream分别为(K,V)和(K,W)对,返回(K,(V,W))对的新DStream |
cogroup(otherStream, [numTasks]) |
两DStream分别为(K,V)和(K,W)对,返回(K,(Seq[V],Seq[W])对新DStreams |
transform(func)
|
将RDD到RDD映射的函数func做用于源 DStream中每一个RDD上获得新DStream。 这个可用于在DStream的RDD上作任意操做 |
updateStateByKey(func) |
获得”状态”DStream,其中每一个key状态的更新是经过将给定函数用于此key的上一个状态和新值而获得。这个可用于保存每一个key值的任意状态数据 |
特殊的Transformations
UpdateStateByKey原语用于记录历史记录,有时,咱们须要在DStream中跨批次维护状态(例如流计算中累加 wordcount)。针对这种状况,updateStateByKey()为咱们提供了对一个状态变量的访问,用于键值对形式的DStream。给定一个由(键,事件)对构成的DStream,并传递一个指定如何根据新的事件更新每一个键对应状态的函数,它能够构建出一个新的DStream,其内部数据为(键,状态)对
updateStateByKey()的结果会是一个新的DStream,其内部的RDD序列是由每一个时间区间对应的(键,状态)对组成的
updateStateByKey操做使得咱们能够在用新信息进行更新时保持任意的状态。为使用这个功能,须要作下面两步:
1. 定义状态,状态能够是一个任意的数据类型
2. 定义状态更新函数,用此函数阐明如何使用以前的状态和来自输入流的新值对状态进行更新
使用updateStateByKey须要对检查点目录进行配置,会使用检查点来保存状态
更新版的 wordcount:
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object WorldCount { def main(args: Array[String]) { // 定义更新状态方法,参数 values 为当前批次单词频度,state 为以往批次单词频度 val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(3)) ssc.checkpoint(".") // Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("master01", 9999) // Split each line into words val words = lines.flatMap(_.split(" ")) //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word => (word, 1)) // 使用 updateStateByKey 来更新状态,统计从运行开始以来单词总的次数 val stateDstream = pairs.updateStateByKey[Int](updateFunc) stateDstream.print() //val wordCounts = pairs.reduceByKey(_ + _) // Print the first ten elements of each RDD generated in this DStream to the console //wordCounts.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate //ssc.stop() } }
Window Operations有点相似于Storm中的State,能够设置窗口的大小和滑动窗口的间隔来动态的获取当前Steaming的容许状态
基于窗口的操做会在一个比StreamingContext的批次间隔更长的时间范围内,经过整合多个批次的结果,计算出整个窗口的结果
全部基于窗口的操做都须要两个参数,分别为窗口时长以及滑动步长,两 者都必须是StreamContext的批次间隔的整数倍。窗口时长控制每次计算最近的多少个批次的数据,其实就是最近的windowDuration/batchInterval个批次。 若是有一个以10秒为批次间隔的源DStream,要建立一个最近30秒的时间窗口(即最近3个批次),就应当把windowDuration设为30秒。而滑动步长的默认值与批次间隔相等,用来控制对新的DStream进行计算的间隔。若是源DStream批次间隔为10秒,而且咱们只但愿每两个批次计算一次窗口结果,就应该把滑动步长设置为20秒
假设,你想拓展前例从而每隔十秒对持续30秒的数据生成word count。为作到这个,咱们须要在持续30秒数据的(word,1)对DStream上应用reduceByKey。使用操做reduceByKeyAndWindow
# reduce last 30 seconds of data, every 10 second windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x -y, 30, 20)
Transformation |
Meaning |
window(windowLength, slideInterval) |
基于对源DStream窗化的批次进行计算返回一个新的DStream |
countByWindow(windowLength, slideInterval) |
返回一个滑动窗口计数流中的元素 |
reduceByWindow(func, windowLength, slideInterval) |
经过使用自定义函数整合滑动区间流元素来建立一个新的单元素流 |
reduceByKeyAndWindow(func, windowLength, slideI nterval, [numTasks])
|
当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处经过对滑动窗口中批次数据使用reduce 函数来整合每一个key的value值。Note:默认状况下,这个操做使用Spark的默认数量并行任务(本地是2),在集群模式中依据配置属性(spark.default.parallelism)来作grouping。能够经过设置可选参数numTasks来设置不一样数量的tasks |
reduceByKeyAndWindow(func, invFunc, windowLeng th, slideInterval, [numTasks]) |
这个函数是上述函数的更高效版本,每一个窗口的reduce值都是经过用前一个窗的reduce值来递增计算。经过reduce进入到滑动窗口数据并”反向reduce”离开窗口的旧数据来实现这个操做。一个例子是随着窗口滑动对keys的“加”“减”计数。经过前边介绍能够想到,这个函数只适用于”可逆的 reduce函数”,也就是这些reduce函数有相应的”反 reduce”函数(以参数invFunc形式传入)。如前述函数,reduce 任务的数量经过可选参数来配置。注意:为了使用这个操做,检查点必须可用 |
countByValueAndWindow(windowLength,slideInterv al, [numTasks])
|
对(K,V)对的DStream调用,返回(K,Long)对的新DStream,其中每一个key的值是其在滑动窗口中频率。如上,可配置 reduce任务数量 |
reduceByWindow()和reduceByKeyAndWindow()让咱们能够对每一个窗口更高效地进行归约操做。它们接收一个归约函数,在整个窗口上执行,好比+。 除此之外,它们还有一种特殊形式,经过只考虑新进入窗口的数据和离开窗口的数据,让Spark增量计算归约结果。这种特殊形式须要提供归约函数的一个逆函数,好比+对应的逆函数为-。对于较大的窗口,提供逆函数能够大大提升执行效率
val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1)) val ipCountDStream = ipDStream.reduceByKeyAndWindow( {(x, y) => x + y}, {(x, y) => x - y}, Seconds(30), Seconds(10)) // 加上新进入窗口的批次中的元素 // 移除离开窗口的老批次中的元素 // 窗口时长 // 滑动步长
countByWindow()和countByValueAndWindow()做为对数据进行计数操做的简写。countByWindow()返回一个表示每一个窗口中元素个数的DStream,而countByValueAndWindow()返回的DStream则包含窗口中每一个值的个数
val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()} val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10)) val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))
WordCount 第三版:3 秒一个批次,窗口 12 秒,滑步 6 秒
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object WorldCount { def main(args: Array[String]) { // 定义更新状态方法,参数 values 为当前批次单词频度,state 为以往批次单词频度 val updateFunc = (values: Seq[Int], state: Option[Int]) => { val currentCount = values.foldLeft(0)(_ + _) val previousCount = state.getOrElse(0) Some(currentCount + previousCount) } val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") val ssc = new StreamingContext(conf, Seconds(3)) ssc.checkpoint(".") // Create a DStream that will connect to hostname:port, like localhost:9999 val lines = ssc.socketTextStream("master01", 9999) // Split each line into words val words = lines.flatMap(_.split(" ")) //import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3 // Count each word in each batch val pairs = words.map(word => (word, 1)) val wordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b),Seconds(12), Seconds(6)) // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print() ssc.start() // Start the computation ssc.awaitTermination() // Wait for the computation to terminate //ssc.stop() } }
Transform原语容许DStream上执行任意的RDD-to-RDD函数。即便这些函数并无在DStream的API中暴露出来,经过该函数能够方便的扩展Spark API
该函数每一批次调度一次
好比下面的例子,在进行单词统计的时候,想要过滤掉spam的信息,其实也就是对DStream中的RDD应用转换
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information val cleanedDStream = wordCounts.transform { rdd => rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning ... }
链接操做(leftOuterJoin, rightOuterJoin, fullOuterJoin也能够),能够链接Stream-Stream,windows-stream to windows-stream、stream-dataset
Stream-Stream Joins
val stream1: DStream[String, String] = ... val stream2: DStream[String, String] = ... val joinedStream = stream1.join(stream2) val windowedStream1 = stream1.window(Seconds(20)) val windowedStream2 = stream2.window(Minutes(1)) val joinedStream = windowedStream1.join(windowedStream2)
Stream-dataset joins
val dataset: RDD[String, String] = ... val windowedStream = stream.window(Seconds(20))... val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
输出操做指定了对流数据经转化操做获得的数据所要执行的操做(例如把结果推入外部数据库或输出到屏幕上)。与RDD中的惰性求值相似,若是一个DStream及其派生出的DStream都没有被执行输出操做,那么这些DStream就都不会被求值。若是 StreamingContext中没有设定输出操做,整个context就都不会启动
Output Operation |
Meaning |
print() |
在运行流程序的驱动结点上打印DStream 中每一批次数据的最开始 10个元素。这用于开发和调试。在 Python API 中,一样的操做叫 pprint() |
saveAsTextFiles(prefix, [suffix]) |
以 text 文件形式存储这个DStream 的内容。每一批次的存储文 件名基于参数中的 prefix 和 suffix。”prefix-Time_IN_MS[.suffix]”. |
saveAsObjectFiles(prefix, [suffix]) |
以 Java 对象序列化的方式将Stream 中的数据保存为SequenceFiles . 每一批次的存储文件 名 基 于 参 数 中 的 为 "prefix- TIME_IN_MS[.suffix]". Python 中 目 前不可用。 |
saveAsHadoopFiles(prefix, [suffix]) |
将 Stream 中的数据保存为Hadoop files. 每一批次的存储文件名 基 于 参 数 中 的 为 "prefix- TIME_IN_MS[.suffix]". |
foreachRDD(func) |
这是最通用的输出操做,即将函 数 func 用于产生于 stream 的每个RDD。其中参数传入的函数 func 应该 实现将每个 RDD 中数据推送到外 部系统,如将 RDD 存入文件或者经过 网络将其写入数据库。注意:函数 func在运行流应用的驱动中被执行,同时 其中通常函数 RDD 操做从而强制其 对于流 RDD 的运算 |
通用的输出操做foreachRDD(),它用来对DStream中的RDD运行任意计算。这和transform()有些相似,均可以让咱们访问任意RDD。在foreachRDD()中,能够重用咱们在Spark中实现的全部行动操做。好比,常见的用例之一是把数据写到诸如MySQL的外部数据库中
须要注意的是:
1) 链接不能写在driver层面
2) 若是写在foreach则每一个RDD都建立,得不偿失
3) 增长foreachPartition,在分区建立
4) 能够考虑使用链接池优化
dstream.foreachRDD { rdd => // error val connection = createNewConnection() // executed at the driver 序列化错误 rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record) // executed at the worker ) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } }
累加器(Accumulators)和广播变量(Broadcast variables)不能从Spark Streaming的检查点中恢复。若是你启用检查并也使用了累加器和广播变量,那么你必须建立累加器和广播变量的延迟单实例从而在驱动因失效重启后他们能够被从新实例化。以下例述:
object WordBlacklist { @volatile private var instance: Broadcast[Seq[String]] = null def getInstance(sc: SparkContext): Broadcast[Seq[String]] = { if (instance == null) { synchronized { if (instance == null) { val wordBlacklist = Seq("a", "b", "c") instance = sc.broadcast(wordBlacklist) } } } instance } } object DroppedWordsCounter { @volatile private var instance: LongAccumulator = null def getInstance(sc: SparkContext): LongAccumulator = { if (instance == null) { synchronized { if (instance == null) { instance = sc.longAccumulator("WordsInBlacklistCounter") } } } instance } } wordCounts.foreachRDD{ (rdd: RDD[(String, Int)], time: Time) => // Get or register the blacklist Broadcast val blacklist = WordBlacklist.getInstance(rdd.sparkContext) // Get or register the droppedWordsCounter Accumulator val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext) // Use blacklist to drop words and use droppedWordsCounter to count them val counts = rdd.filter { case (word, count) => if (blacklist.value.contains(word)) { droppedWordsCounter.add(count) false } else { true } }.collect().mkString("[", ", ", "]") val output = "Counts at time " + time + " " + counts }
咱们能够很容易地在流数据上使用DataFrames和SQL。可是必须使用SparkContext来建立StreamingContext要用的SQLContext。此外,这一过程能够在驱动失效后重启。咱们经过建立一个实例化的SQLContext单实例来实现这个工做。以下例所示。咱们对前例 word count进行修改从而使用DataFrames和SQL来产生word counts。每一个RDD被转换为DataFrame,以临时表格配置并用SQL进行查询
val words: DStream[String] = ... words.foreachRDD { rdd => // Get the singleton instance of SparkSession val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate() import spark.implicits._ // Convert RDD[String] to DataFrame val wordsDataFrame = rdd.toDF("word") // Create a temporary view wordsDataFrame.createOrReplaceTempView("words") // Do word count on DataFrame using SQL and print it val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") wordCountsDataFrame.show() }
咱们也能够从不一样的线程在定义于流数据的表上运行SQL查询(也就是说,异步运行StreamingContext)。仅肯定你设置StreamingContext记住了足够数量的流数据以使得查询操做能够运行。不然,StreamingContext不会意识到任何异步的SQL查询操做,那么其就会在查询完成以后删除旧的数据。例如,若是你要查询最后一批次,可是你的查询会运行5分钟,那么你须要调用streamingContext.remember(Minutes(5))(in Scala, 或者其余语言的等价操做)
和RDDs相似,DStreams一样容许开发者将流数据保存在内存中。也就是说,在DStream上使用persist()方法将会自动把DStreams中的每一个RDD保存在内存中。当DStream中的数据要被屡次计算时,这个很是有用(如在一样数据上的屡次操做)。对于像 reduceByWindow和reduceByKeyAndWindow以及基于状态的(updateStateByKey)这种操做,保存是隐含默认的。所以,即便开发者没有调用persist(),由基于窗操做产生的DStreams会自动保存在内存中
检查点机制是咱们在Spark Streaming中用来保障容错性的主要机制。与应用程序逻辑无关的错误(即系统错位,JVM崩溃等)有迅速恢复的能力
它可使Spark Streaming阶段性地把应用数据存储到诸如HDFS或Amazon S3这样的可靠存储系统中,以供恢复时使用。具体来讲,检查点机制主要为如下两个目的服务
1) 控制发生失败时须要重算的状态数。SparkStreaming能够经过转化图的谱系图来重算状态,检查点机制则能够控制须要在转化图中回溯多远
2) 提供驱动器程序容错。若是流计算应用中的驱动器程序崩溃了,你能够重启驱动器程序并让驱动器程序从检查点恢复,这样Spark Streaming就能够读取以前运行的程序处理数据的进度,并从那里继续
为了实现这个,Spark Streaming须要为容错存储系统checkpoint足够的信息从而使得其能够从失败中恢复过来。有两种类型的数据设置检查点
Metadata checkpointing:将定义流计算的信息存入容错的系统如HDFS
元数据包括:
配置-用于建立流应用的配置
DStreams操做-定义流应用的DStreams操做集合
不完整批次-批次的工做已进行排队可是并未完成
Data checkpointing:将产生的RDDs存入可靠的存储空间。对于在多批次间合并数据的状态转换,这个颇有必要。在这样的转换中,RDDs的产生基于以前批次的RDDs,这样依赖链长度随着时间递增。为了不在恢复期这种无限的时间增加(和链长度成比例),状态转换中间的RDDs周期性写入可靠地存储空间(如HDFS)从而切短依赖链
总而言之,元数据检查点在由驱动失效中恢复是首要须要的。而数据或者RDD检查点甚至在使用了状态转换的基础函数中也是必要的
出于这些缘由,检查点机制对于任何生产环境中的流计算应用都相当重要。咱们能够经过向ssc.checkpoint()方法传递一个路径参数(HDFS、S3或者本地路径都可)来配置检查点机制,同时咱们的应用应该可以使用检查点的数据
1. 当程序首次启动,其将建立一个新的StreamingContext,设置全部的流并调用start()
2. 当程序在失效后重启,其将依据检查点目录的检查点数据从新建立一个StreamingContext。经过使用StraemingContext.getOrCreate很容易得到这个性能
ssc.checkpoint("hdfs://...") # 建立和设置一个新的StreamingContext def functionToCreateContext(): sc = SparkContext(...) # new context ssc = new StreamingContext(...) lines = ssc.socketTextStream(...) # create DStreams ... ssc.checkpoint(checkpointDirectory) # 设置检查点目录 return ssc # 从检查点数据中获取 StreamingContext 或者从新建立一个 context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext) # 在须要完成的 context 上作额外的配置 # 不管其有没有启动 context ... # 启动context context.start() contaxt.awaitTermination()
若是检查点目录(checkpointDirectory)存在,那么context将会由检查点数据从新建立。若是目录不存在(首次运行),那么函数functionToCreateContext将会被调用来建立一个新的context并设置DStreams
注意RDDs的检查点引发存入可靠内存的开销。在RDDs须要检查点的批次里,处理的时间会所以而延长。因此,检查点的间隔须要很仔细地设置。在小尺寸批次(1 秒钟)。每一批次检查点会显著减小操做吞吐量。反之,检查点设置的过于频繁致使“血统”和任务尺寸增加,这会有很很差的影响对于须要 RDD检查点设置的状态转换,默认间隔是批次间隔的乘数通常至少为10秒钟。能够经过dstream.checkpoint(checkpointInterval)。一般,检查点设置间隔是5-10个DStream的滑动间隔
WAL即write ahead log(预写日志),是在1.2 版本中就添加的特性。做用就是,将数据经过日志的方式写到可靠的存储,好比HDFS、s3,在driver或worker failure时能够从在可靠存储上的日志文件恢复数据。WAL在driver端和executor端都有应用
WAL在driver端的应用
用于写日志的对象writeAheadLogOption:WriteAheadLog。在StreamingContext中的JobScheduler中的ReceiverTracker的ReceivedBlockTracker构造函数中被建立,ReceivedBlockTracker用于管理已接收到的blocks信息。须要注意的是,这里只须要启用 checkpoint就能够建立该driver端的WAL管理实例,而不须要将spark.streaming.receiver.writeAheadLog.enable设置为true
写什么、什么时候写
首选须要明确的是,ReceivedBlockTracker经过WAL写入log文件的内容是3种事件(固然,会进行序列化):
case class BlockAdditionEvent(receivedBlockInfo: ReceivedBlockInfo);即新增了一个block及该block的具体信息,包括streamId、blockId、数据条数等
case class BatchAllocationEvent(time: Time, allocatedBlocks: AllocatedBlocks);即为某个batchTime分配了哪些blocks做为该batch RDD的数据源
case class BatchCleanupEvent(times: Seq[Time]);即清理了哪些batchTime对应的block
知道了写了什么内容,结合源码,也不难找出是何时写了这些内容。须要再次注意的是,写上面这三种事件,也不须要将spark.streaming.receiver.writeAheadLog.enable设置为true
WAL在executor端的应用
Receiver接收到的数据会源源不断的传递给ReceiverSupervisor,是否启用WAL机制(便是否将spark.streaming.receiver.writeAheadLog.enable设置为true)会影响ReceiverSupervisor在存储block时的行为:
不启用 WAL:你设置的StorageLevel是什么,就怎么存储。好比MEMORY_ONLY只会在内存中存一份,MEMORY_AND_DISK会在内存和磁盘上各存一份等
启用 WAL:在StorageLevel指定的存储的基础上,写一份到WAL中。存储一份在WAL上,更不容易丢数据但性能损失也比较大
关因而否要启用WAL,要视具体的业务而定:
若能够接受必定的数据丢失,则不须要启用 WAL,由于对性能影响较大
若彻底不能接受数据丢失,那就须要同时启用checkpoint和WAL,checkpoint保存着执行进度(好比已生成但未完成的jobs),WAL中保存着blocks及blocks元数据(好比保存着未完成的jobs对应的blocks信息及block文件)。同时,这种状况可能要在数据源和 Streaming Application中联合来保证exactly once语义
预写日志功能的流程是:
1) 一个SparkStreaming应用开始时(也就是driver开始时),相关的StreamingContext使用SparkContext启动接收器成为长驻运行任务。这些接收器接收并保存流数据到Spark内存中以供处理
2) 接收器通知driver
3) 接收块中的元数据(metadata)被发送到driver的StreamingContext。
这个元数据包括:
(a) 定位其在executor内存中数据的块referenceid
(b) 块数据在日志中的偏移信息(若是启用了)
用户传送数据的生命周期以下图所示:
默认状况下,Spark Streaming经过Receiver以生产者生产数据的速率接收数据,计算过程当中会出现batch processing time > batch interval的状况,其中batch processing time为实际计算一个批次花费时间,batch interval为Streaming应用设置的批处理间隔。这意味着Spark Streaming的数据接收速率高于Spark从队列中移除数据的速率,也就是数据处理能力低,在设置间隔内不能彻底处理当前接收速率接收的数据。若是这种状况持续过长的时间,会形成数据在内存中堆积,致使Receiver所在Executor内存溢出等问题(若是设置StorageLevel包含disk,则内存存放不下的数据会溢写至disk,加大延迟)。Spark 1.5 之前版本,用户若是要限制Receiver的数据接收速率,能够经过设置静态配制参数“spark.streaming.receiver.maxRate”的值来实现,此举虽然能够经过限制接收速率,来适配当前的处理能力,防止内存溢出,但也会引入其它问题。好比:producer数据生产高于maxRate,当前集群处理能力也高于maxRate,这就会形成资源利用率降低等问题。为了更好的协调数据接收速率与资源处理能力,Spark Streaming从 v1.5 开始引入反压机制(back-pressure),经过动态控制数据接收速率来适配集群数据处理能力
Spark Streaming Backpressure: 根据JobScheduler反馈做业的执行信息来动态调整Receiver数据接收率。经过属性“spark.streaming.backpressure.enabled”来控制是否启用backpressure机制,默认值false,即不启用
Streaming架构以下图所示
在原架构的基础上加上一个新的组件RateController,这个组件负责监听“OnBatchCompleted”事件,而后从中抽取processingDelay及schedulingDelay信息。Estimator依据这些信息估算出最大处理速度(rate),最后由基于Receiver的Input Stream将rate经过ReceiverTracker与 ReceiverSupervisorImpl转发给 BlockGenerator(继承自RateLimiter)
流量控制点
当Receiver开始接收数据时,会经过supervisor.pushSingle()方法将接收的数据存入currentBuffer等待BlockGenerator定时将数据取走,包装成block。在将数据存放入currentBuffer之时,要获取许可(令牌)。若是获取到许可就能够将数据存入buffer, 不然将被阻塞,进而阻塞Receiver从数据源拉取数据
其令牌投放采用令牌桶机制进行,原理以下图所示:
令牌桶机制:大小固定的令牌桶可自行以恒定的速率源源不断地产生令牌。若是令牌不被消耗,或者被消耗的速度小于产生的速度,令牌就会不断地增多,直到把桶填满。后面再产生的令牌就会从桶中溢出。最后桶中能够保存的最大令牌数永远不会超过桶的大小。当进行某操做时须要令牌时会从令牌桶中取出相应的令牌数,若是获取到则继续操做,不然阻塞。用完以后不用放回
驱动器程序的容错要求咱们以特殊的方式建立StreamingContext。咱们须要把检查点目录提供给StreamingContext。与直接调用new StreamingContext不一样,应该使用StreamingContext.getOrCreate()函数
配置过程以下:
一、 启动 Driver 自动重启功能
standalone: 提交任务时添加 --supervise 参数
yarn:设置yarn.resourcemanager.am.max-attempts 或者spark.yarn.maxAppAttempts
mesos: 提交任务时添加 --supervise 参数
二、 设置 checkpoint
StreamingContext.setCheckpoint(hdfsDirectory)
三、支持从 checkpoint 中重启配置
def createContext(checkpointDirectory: String): StreamingContext = { val ssc = new StreamingContext ssc.checkpoint(checkpointDirectory) ssc } val ssc = StreamingContext.getOrCreate(checkpointDirectory, createContext(checkpointDirectory))
为了应对工做节点失败的问题,Spark Streaming使用与Spark的容错机制相同的方法。全部从外部数据源中收到的数据都在多个工做节点上备份。全部从备份数据转化操做的过程当中建立出来的RDD都能容忍一个工做节点的失败,由于根据RDD谱系图,系统能够把丢失的数据从幸存的输入数据备份中重算出来。对于reduceByKey等Stateful操做重作的lineage较长的,强制启动checkpoint,减小重作概率
运行接收器的工做节点的容错也是很重要的。若是这样的节点发生错误,SparkStreaming会在集群中别的节点上重启失败的接收器。然而,这种状况会不会致使数据的丢失取决于数据源的行为(数据源是否会重发数据)以及接收器的实现(接收器是否会向数据源确认收到数据)。举个例子,使用Flume做 为数据源时,两种接收器的主要区别在于数据丢失时的保障。在“接收器从数 据池中拉取数据”的模型中,Spark只会在数据已经在集群中备份时才会从数据池中移除元素。而在“向接收器推数据”的模型中,若是接收器在数据备份以前失败,一些数据可能就会丢失。总的来讲,对于任意一个接收器,你必须同时考 虑上游数据源的容错性(是否支持事务)来确保零数据丢失
通常主要是经过将接收到数据后先写日志(WAL)到可靠文件系统中,后才写入实际的RDD。若是后续处理失败则成功写入WAL的数据经过WAL进行恢复,未成功写入WAL的数据经过可回溯的Source进行重放
总的来讲,接收器提供如下保证
1) 全部从可靠文件系统中读取的数据(好比经过StreamingContext.hadoopFiles读取的)都是可靠的,由于底层的文件系统是有备份的。Spark Streaming会记住哪些数据存放到了检查点中,并在应用崩溃后从检查点处继续执行
2) 对于像Kafka、推式Flume、Twitter这样的不可靠数据源,Spark会把输入数据复制到其余节点上,可是若是接收器任务崩溃,Spark仍是会丢失数据。在Spark 1.1以及更早的版本中,收到的数据只被备份到执行器进程的内存中,因此一旦驱动器程序崩溃(此时全部的执行器进程都会丢失链接), 数据也会丢失。在Spark 1.2 中,收到的数据被记录到诸如HDFS这样的可靠的文件系统中,这样即便驱动器程序重启也不会致使数据丢失
综上所述,确保全部数据都被处理的最佳方式是使用可靠的数据源(例如HDFS、拉式 Flume 等)。若是咱们还要在批处理做业中处理这些数据,使用可靠数据源是最佳方式,由于这种方式确保了咱们的批处理做业和流计算做业能读取到相同的数据,于是能够获得相同的结果
操做过程以下:
启用checkpoint
-ssc.setCheckpoint(checkpointDir)
启用 WAL
-sparkConf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
对Receiver使用可靠性存储StoreageLevel.MEMORY_AND_DISK_SER or StoreageLevel.MEMORY_AND_DISK_SER2
因为Spark Streaming工做节点的容错保障,Spark Streaming能够为全部的转化操做提供 “精确一次”执行的语义,即便一个工做节点在处理部分数据时发生失败,最终的转化结果(即转化操做获得的RDD)仍然与数据只被处理一次获得的结果同样
然而,当把转化操做获得的结果使用输出操做推入外部系统中时,写结果的任务可能因故障而执行屡次,一些数据可能也就被写了屡次。因为这引入了外部系统,所以咱们须要专门针对各系统的代码来处理这样的状况。咱们可使用事务操做来写入外部系统(即原子化地将一个RDD分区一次写入), 或者设计幂等的更新操做(即屡次运行同一个更新操做仍生成相同的结果)。好比Spark Streaming的saveAs...File操做会在一个文件写完时自动将其原子化地移动到最终位置上,以此确保每一个输出文件只存在一份
最多见的问题是Spark Streaming可使用的最小批次间隔是多少。总的来讲,500毫秒已经被证明为对许多应用而言是比较好的最小批次大小。寻找最小批次大小的最佳实践是从一个比较大的批次大小(10秒左右)开始,不断使用更小的批次大小。若是Streaming用户界面中显示的处理时间保持不变,咱们就能够进一步减少批次大小。若是处理时间开始增长,咱们可能已经达到了应用的极限
类似地,对于窗口操做,计算结果的间隔(也就是滑动步长)对于性能也有巨大的影响。当计算代价巨大并成为系统瓶颈时,就应该考虑提升滑动步长了。减小批处理所消耗时间的常见方式还有提升并行度。有如下三种方式能够提升并行度:
1) 增长接收器数目有时若是记录太多致使单台机器来不及读入并分发的话,接收器会成为系统瓶颈。这时咱们就须要经过建立多个输入DStream(这样会建立多个接收器)来增长接收器数目,而后使用union来把数据合并为一个数据源
2) 将收到的数据显式地从新分区若是接收器数目没法再增长,咱们能够经过使用DStream.repartition来显式从新分区输入流(或者合并多个流获得的数据流)来从新分配收到的数据
3) 提升聚合计算的并行度 对于像reduceByKey()这样的操做,咱们能够在第二个参数中指定并行度,咱们在介绍RDD时提到过相似的手段