第1章 Spark Streaming 概述1.1 什么是 Spark Streaming1.2 为何要学习 Spark Streaming1.3 Spark 与 Storm 的对比第2章 运行 Spark Streaming第3章 架构与抽象第4章 Spark Streaming 解析4.1 初始化 StreamingContext4.2 什么是 DStreams4.3 DStream 的输入4.3.1 基本数据源4.3.2 高级数据源4.4 DStream 的转换4.4.1 无状态转化操做4.4.2 有状态转化操做4.4.3 重要操做4.5 DStream 的输出4.6 累加器和广播变量4.7 DataFrame ans SQL Operations4.8 Caching / Persistence4.9 不间断运行 7x24 小时4.9.1 检查点机制4.9.2 驱动器程序容错4.9.3 工做节点容错4.9.4 接收器容错4.9.5 处理保证4.10 性能考量php
![]()
Spark Streaming 相似于 Apache Storm,用于流式数据的处理。根据其官方文档介绍,Spark Streaming 有高吞吐量和容错能力强等特色。Spark Streaming 支持的数据输入源不少,例如:Kafka、Flume、Twitter、ZeroMQ 和简单的 TCP 套接字等等。数据输入后能够用 Spark 的高度抽象,如:map、reduce、join、window 等进行运算。而结果也能保存在不少地方,如 HDFS,数据库等。另外 Spark Streaming 也能和 MLlib(机器学习)以及 Graphx 完美融合。
![]()
和 Spark 基于 RDD 的概念很类似,Spark Streaming 使用离散化流(discretized stream)做为抽象表示,叫做 DStream。DStream 是随时间推移而收到的数据的序列。在内部,每一个时间区间收到的数据都做为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(所以得名“离散化”)。
![]()
DStream 能够从各类输入源建立,好比 Flume、Kafka 或者 HDFS。建立出来的 DStream 支持两种操做,一种是转化操做(transformation),会生成一个新的 DStream,另外一种是输出操做(output operation),能够把数据写入外部系统中。DStream 提供了许多与 RDD 所支持的操做相相似的操做支持,还增长了与时间相关的新操做,好比滑动窗口。
Spark Streaming 的关键抽象html
![]()
DStream:Discretized Stream 离散化流
![]()
一、易用
二、容错
三、易整合到 Spark 体系java![]()
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<!-- provided 表示编译期可用,运行期不可用 -->
<!--<scope>provided</scope>-->
</dependency>
示例代码以下:sql
package com.atguigu.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WorldCount {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("hadoop102", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
// import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val results = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
results.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
ssc.stop()
}
}
安装 Netcat 后,参考文章连接:https://www.cnblogs.com/chenmingjun/p/10785438.html
先启动 Netcat,而后经过 Netcat 发送数据:数据库
$ nc -l -p 9999 #监听9999端口
hello world #运行 jar 包后,发送测试数据
再按照 Spark Core 中的方式进行打包,并将程序上传到Spark机器上。并运行:apache
/opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit --class com.atguigu.streaming.WorldCount /opt/software/sparkjars/networdcount-jar-with-dependencies.jar
注意
:若是程序运行时,log 日志太多,能够将 spark 的 conf 目录下的 log4j 文件里面的日志级别改为 WARN。编程
Spark Streaming 使用“微批次”的架构,把流式计算看成一系列连续的小规模批处理来对待。Spark Streaming 从各类输入源中读取数据,并把数据分组为小的批次。新的批次按均匀的时间间隔建立出来。在每一个时间区间开始的时候,一个新的批次就建立出来,在该区间内收到的数据都会被添加到这个批次中。在时间区间结束时,批次中止增加。时间区间的大小是由批次间隔这个参数决定的。批次间隔通常设在 500 毫秒到几秒之间,由应用开发者配置。每一个输入批次都造成一个 RDD,以 Spark 做业的方式处理并生成其余的 RDD。处理的结果能够以批处理的方式传给外部系统。高层次的架构以下图所示:bootstrap
![]()
Spark Streaming 的编程抽象是离散化流,也就是 DStream。它是一个 RDD 序列,每一个 RDD 表明数据流中一个时间片内的数据。windows
![]()
Spark Streaming 在 Spark 的驱动器程序 -- 工做节点的结构的执行过程以下图所示。Spark Streaming 为每一个输入源启动对应的接收器。接收器以任务的形式运行在应用的执行器进程中,从输入源收集数据并保存为 RDD。它们收集到输入数据后会把数据复制到另外一个执行器进程来保障容错性(默认行为)。数据保存在执行器进程的内存中,和缓存 RDD 的方式同样。驱动器程序中的 StreamingContext 会周期性地运行 Spark 做业来处理这些数据,把数据与以前时间区间中的 RDD 进行整合。缓存
![]()
源码:
import org.apache.spark._
import org.apache.spark.streaming._
// 能够经过 ssc.sparkContext 来访问 SparkContext
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
// 或者经过已经存在的 SparkContext 来建立 StreamingContext
import org.apache.spark.streaming._
val sc = ... // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
初始化完 Context 以后:
1)定义消息输入源来建立 DStreams。
2)定义 DStreams 的转化操做和输出操做。
3)经过 streamingContext.start() 来启动消息采集和处理.
4)等待程序终止,能够经过 streamingContext.awaitTermination() 来设置。
5)经过 streamingContext.stop() 来手动终止处理程序。
注意:
StreamingContext 一旦启动,对 DStreams 的操做就不能修改了。
在同一时间一个 JVM 中只有一个 StreamingContext 能够启动。
stop() 方法将同时中止 SparkContext,能够传入参数 stopSparkContext 用于只中止 StreamingContext。
Discretized Stream 是 Spark Streaming 的基础抽象,表明持续性的数据流和通过各类 Spark 原语操做后的结果数据流。在内部实现上,DStream 是一系列连续的 RDD 来表示。每一个 RDD 含有一段时间间隔内的数据,以下图:
![]()
对数据的操做也是按照 RDD 为单位来进行的,以下图:
![]()
计算过程由 Spark Engine 来完成,以下图:
![]()
Spark Streaming 原生支持一些不一样的数据源。一些“核心”数据源已经被打包到 Spark Streaming 的 Maven 工件中,而其余的一些则能够经过 spark-streaming-kafka 等附加工件获取。每一个接收器都以 Spark 执行器程序中一个长期运行的任务的形式运行,所以会占据分配给应用的 CPU 核心。此外,咱们还须要有可用的 CPU 核心来处理数据。这意味着若是要运行多个接收器,就必须至少有和接收器数目相同的核心数,还要加上用来完成计算所须要的核心数。例如,若是咱们想要在流计算应用中运行 10 个接收器,那么至少须要为应用分配 11 个 CPU 核心。因此若是在本地模式运行,不要使用 local 或者 local[1]。
文件数据源(实际开发中这种方式用的比较少)
Socket 数据流前面的例子已经看到过。
文件数据流:可以读取全部 HDFS API 兼容的文件系统文件,经过 fileStream 方法进行读取。
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
Spark Streaming 将会监控 dataDirectory 目录并不断处理移动进来的文件,注意:目前不支持嵌套目录。
1)文件须要有相同的数据格式。
2)文件进入 dataDirectory 的方式须要经过移动或者重命名来实现。
3)一旦文件移动进目录,则不能再修改,即使修改了也不会读取新的数据。
若是文件比较简单,则可使用 streamingContext.textFileStream(dataDirectory)
方法来读取文件。文件流不须要接收器,不须要单独分配 CPU 核。
Hdfs 读取实例:(须要提早在 HDFS 上建好目录
)
scala> import org.apache.spark.streaming._
import org.apache.spark.streaming._
scala> val ssc = new StreamingContext(sc, Seconds(1))
ssc: org.apache.spark.streaming.StreamingContext = org.apache.spark.streaming.StreamingContext@4eb3b690
scala> val lines = ssc.textFileStream("hdfs://hadoop102:9000/data/")
lines: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.MappedDStream@14c7ab73
scala> val words = lines.flatMap(_.split(" "))
words: org.apache.spark.streaming.dstream.DStream[String] = org.apache.spark.streaming.dstream.FlatMappedDStream@125bc00d
scala> val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Int)] = org.apache.spark.streaming.dstream.ShuffledDStream@4a3363c9
scala> wordCounts.print()
scala> ssc.start()
上传文件到 HDFS 进行测试:
[atguigu@hadoop102 hadoop-2.7.2]$ bin/hdfs dfs -mkdir /data/
[atguigu@hadoop102 hadoop-2.7.2]$ ls
bin data etc include input lib libexec LICENSE.txt logs NOTICE.txt README.txt safemode.sh sbin share wcinput wcoutput
[atguigu@hadoop102 hadoop-2.7.2]$ bin/hdfs dfs -put ./LICENSE.txt /data/
[atguigu@hadoop102 hadoop-2.7.2]$ bin/hdfs dfs -put ./README.txt /data/
获取计算结果:
-------------------------------------------
Time: 1504665716000 ms
-------------------------------------------
-------------------------------------------
Time: 1504665717000 ms
-------------------------------------------
-------------------------------------------
Time: 1504665718000 ms
-------------------------------------------
(227.7202-1,2)
(created,2)
(offer,8)
(BUSINESS,11)
(agree,10)
(hereunder,,1)
(“control”,1)
(Grant,2)
(2.2.,2)
(include,11)
...
-------------------------------------------
Time: 1504665740000 ms
-------------------------------------------
(under,1)
(Technology,1)
(distribution,2)
(http://hadoop.apache.org/core/,1)
(Unrestricted,1)
(740.13),1)
(check,1)
(have,1)
(policies,1)
(uses,1)
...
-------------------------------------------
Time: 1504665741000 ms
-------------------------------------------
自定义数据源(实际开发中用的较多)
经过继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。
// Receiver 须要提供一个类型参数,该类型参数是 Receiver 接收到的数据的类型
class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
override def onStart(): Unit = {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
// 定义一个新的线程去执行 receive() 方法
override def run() {
receive()
}
}.start()
}
override def onStop(): Unit = {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
}
/**
* Create a socket connection and receive data until receiver is stopped
*/
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
// Connect to host:port
socket = new Socket(host, port)
// Until stopped or connection broken continue reading
// 获取 Socket 的输入对象
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
userInput = reader.readLine()
// 当 Receiver 没有中止而且 userInput 不为空
while (!isStopped && userInput != null) {
// 经过 store() 方法将获取到的 userInput 提交给 Spark 框架
store(userInput)
// 再获取下一条
userInput = reader.readLine()
}
reader.close()
socket.close()
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
// restart if could not connect to server
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
// restart if there is any other error
restart("Error receiving data", t)
}
}
}
能够经过 streamingContext.receiverStream(<instance of custom receiver>)
来使用自定义的数据采集源。
// Assuming ssc is the StreamingContext
val customReceiverStream = ssc.receiverStream(new CustomReceiver(host, port))
val words = lines.flatMap(_.split(" "))
...
模拟 Spark 内置的 Socket 连接,所有源码以下:
package com.atguigu.streaming
import java.io.{BufferedReader, InputStreamReader}
import java.net.Socket
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.receiver.Receiver
// Receiver 须要提供一个类型参数,该类型参数是 Receiver 接收到的数据的类型
class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
override def onStart(): Unit = {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
// 定义一个新的线程去执行 receive() 方法
override def run() {
receive()
}
}.start()
}
override def onStop(): Unit = {
// There is nothing much to do as the thread calling receive()
// is designed to stop by itself if isStopped() returns false
}
/**
* Create a socket connection and receive data until receiver is stopped
*/
private def receive() {
var socket: Socket = null
var userInput: String = null
try {
// Connect to host:port
socket = new Socket(host, port)
// Until stopped or connection broken continue reading
// 获取 Socket 的输入对象
val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))
userInput = reader.readLine()
// 当 Receiver 没有中止而且 userInput 不为空
while (!isStopped && userInput != null) {
// 经过 store() 方法将获取到的 userInput 提交给 Spark 框架
store(userInput)
// 再获取下一条
userInput = reader.readLine()
}
reader.close()
socket.close()
// Restart in an attempt to connect again when server is active again
restart("Trying to connect again")
} catch {
case e: java.net.ConnectException =>
// restart if could not connect to server
restart("Error connecting to " + host + ":" + port, e)
case t: Throwable =>
// restart if there is any other error
restart("Error receiving data", t)
}
}
}
object CustomReceiverDemo {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.receiverStream(new CustomReceiver("hadoop102", 9999))
// Split each line into words
val words = lines.flatMap(_.split(" "))
//import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
// ssc.stop()
}
}
先启动 Netcat,而后经过 Netcat 发送数据:
$ nc -l -p 9999 #监听9999端口
hello world #运行 jar 包后,发送测试数据
按照 Spark Core 中的方式进行打包,并将程序上传到Spark机器上。并运行:
/opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit --class com.atguigu.streaming.CustomReceiverDemo /opt/software/sparkjars/sparkstreaming_customerReceiver-1.0-SNAPSHOT-jar-with-dependencies.jar
输出结果截图:
RDD 队列(用在 Spark Streaming 与 RDD 的结合时,即混合程序)
测试过程当中,能够经过使用 streamingContext.queueStream(queueOfRDDs)
来建立 DStream,每个推送到这个队列中的 RDD,都会做为一个 DStream 处理。
package com.atguigu.streaming
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.collection.mutable
object QueueRdd {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("QueueRdd")
val ssc = new StreamingContext(conf, Seconds(1))
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
// 建立 RDD 队列
val rddQueue = new mutable.SynchronizedQueue[RDD[Int]]()
// Create the QueueInputDStream and use it do some processing
// 建立 QueueInputDStream
val inputStream = ssc.queueStream(rddQueue)
// 处理队列中的 RDD 数据
val mappedStream = inputStream.map(x => (x % 10, 1))
val reducedStream = mappedStream.reduceByKey(_ + _)
// 打印结果
reducedStream.print()
// 启动计算
ssc.start()
// Create and push some RDDs into
for (i <- 1 to 30) {
rddQueue += ssc.sparkContext.makeRDD(1 to 300, 10)
Thread.sleep(2000)
// 经过程序中止 StreamingContext 的运行
// ssc.stop()
}
}
}
运行jar 包
/opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit --class com.atguigu.streaming.QueueRdd /opt/software/sparkjars/sparkstreaming_queueRdd-1.0-SNAPSHOT-jar-with-dependencies.jar
输出结果以下:
[atguigu@hadoop102 spark-2.1.1-bin-hadoop2.7]$ /opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit \
--class com.atguigu.streaming.QueueRdd /opt/software/sparkjars/sparkstreaming_queueRdd-1.0-SNAPSHOT-jar-with-dependencies.jar
19/04/28 20:30:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-------------------------------------------
Time: 1556454615000 ms
-------------------------------------------
(4,30)
(0,30)
(6,30)
(8,30)
(2,30)
(1,30)
(3,30)
(7,30)
(9,30)
(5,30)
-------------------------------------------
Time: 1556454616000 ms
-------------------------------------------
-------------------------------------------
Time: 1556454617000 ms
-------------------------------------------
(4,30)
(0,30)
(6,30)
(8,30)
(2,30)
(1,30)
(3,30)
(7,30)
(9,30)
(5,30)
-------------------------------------------
Time: 1556454618000 ms
-------------------------------------------
......
除核心数据源外,还能够用附加数据源接收器来从一些知名数据获取系统中接收的数据,这些接收器都做为 Spark Streaming 的组件进行独立打包了。它们仍然是 Spark 的一部分,不过你须要在构建文件中添加额外的包才能使用它们。现有的接收器包括 Twitter、Apache Kafka、Amazon Kinesis、Apache Flume,以及 ZeroMQ。能够经过添加与 Spark 版本匹配 的 Maven 工件 spark-streaming-[projectname]_2.10 来引入这些附加接收器。
Apache Kafka
在工程中须要引入 Maven 工件 spark- streaming-kafka_2.10 来使用它。包内提供的 KafkaUtils 对象能够在 StreamingContext 和 JavaStreamingContext 中以你的 Kafka 消息建立出 DStream。因为 KafkaUtils 能够订阅多个主题,所以它建立出的 DStream 由成对的主题和消息组成。要建立出一个流数据,须要使用 StreamingContext 实例、一个由逗号隔开的 ZooKeeper 主机列表字符串、消费者组的名字(惟一名字),以及一个从主题到针对这个主题的接收器线程数的映射表来调用 createStream() 方法。
import org.apache.spark.streaming.kafka._
...
// 建立一个从主题到接收器线程数的映射表
val topics = List(("pandas", 1), ("logs", 1)).toMap
val topicLines = KafkaUtils.createStream(ssc, zkQuorum, group, topics)
topicLines.map(_._2)
下面咱们进行一个实例,演示 SparkStreaming 如何从 Kafka 读取消息,以及如何经过链接池方法把消息处理完成后再写回 Kafka:
pom.xml 须要加入的依赖以下:
<!-- 用来提供对象链接池 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.4.2</version>
</dependency>
<!-- 用来链接 Kafka 的工具类 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
kafka Connection Pool 程序:
package com.atguigu.streaming
import java.util.Properties
import org.apache.commons.pool2.impl.DefaultPooledObject
import org.apache.commons.pool2.{BasePooledObjectFactory, PooledObject}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
// 自定义的样例类(是池化的对象)
case class KafkaProducerProxy(brokerList: String,
producerConfig: Properties = new Properties,
defaultTopic: Option[String] = None,
producer: Option[KafkaProducer[String, String]] = None) {
type Key = String
type Val = String
require(brokerList == null || !brokerList.isEmpty, "Must set broker list")
private val p = producer getOrElse {
val props: Properties = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
new KafkaProducer[String, String](props)
}
// 把我要发送的消息包装成了 ProducerRecord
private def toMessage(value: Val, key: Option[Key] = None, topic: Option[String] = None): ProducerRecord[Key, Val] = {
val t = topic.getOrElse(defaultTopic.getOrElse(throw new IllegalArgumentException("Must provide topic or default topic")))
require(!t.isEmpty, "Topic must not be empty")
key match {
case Some(k) => new ProducerRecord(t, k, value)
case _ => new ProducerRecord(t, value)
}
}
def send(key: Key, value: Val, topic: Option[String] = None) {
// 调用 KafkaProducer 对象的 send 方法来发送消息
p.send(toMessage(value, Option(key), topic))
}
def send(value: Val, topic: Option[String]) {
send(null, value, topic)
}
def send(value: Val, topic: String) {
send(null, value, Option(topic))
}
def send(value: Val) {
send(null, value, None)
}
def shutdown(): Unit = p.close()
}
abstract class KafkaProducerFactory(brokerList: String, config: Properties, topic: Option[String] = None) extends Serializable {
def newInstance(): KafkaProducerProxy
}
class BaseKafkaProducerFactory(brokerList: String,
config: Properties = new Properties,
defaultTopic: Option[String] = None)
extends KafkaProducerFactory(brokerList, config, defaultTopic) {
override def newInstance() = new KafkaProducerProxy(brokerList, config, defaultTopic)
}
// 继承一个基础的链接池,须要提供池化的对象类型
class PooledKafkaProducerAppFactory(val factory: KafkaProducerFactory)
extends BasePooledObjectFactory[KafkaProducerProxy] with Serializable {
// 用于链接池建立对象
override def create(): KafkaProducerProxy = factory.newInstance()
// 用于链接池包装对象
override def wrap(obj: KafkaProducerProxy): PooledObject[KafkaProducerProxy] = new DefaultPooledObject(obj)
// 用于链接池销毁对象
override def destroyObject(p: PooledObject[KafkaProducerProxy]): Unit = {
p.getObject.shutdown()
super.destroyObject(p)
}
}
KafkaStreaming main:
package com.atguigu.streaming
import org.apache.commons.pool2.impl.{GenericObjectPool, GenericObjectPoolConfig}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
// 单例对象(即保证了 kafka 链接池只有一个)
object createKafkaProducerPool {
// 用于返回真正的对象池 GenericObjectPool
def apply(brokerList: String, topic: String): GenericObjectPool[KafkaProducerProxy] = {
val producerFactory = new BaseKafkaProducerFactory(brokerList, defaultTopic = Option(topic))
val pooledProducerFactory = new PooledKafkaProducerAppFactory(producerFactory)
// 指定 kafka 对象池的大小
val poolConfig = {
val c = new GenericObjectPoolConfig
val maxNumProducers = 10
c.setMaxTotal(maxNumProducers)
c.setMaxIdle(maxNumProducers)
c
}
// 返回一个对象池
new GenericObjectPool[KafkaProducerProxy](pooledProducerFactory, poolConfig)
}
}
object KafkaStreaming {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// 建立 topic
val brobrokers = "192.168.25.102:9092,192.168.25.103:9092,192.168.25.104:9092" // kafka 集群的地址
val sourcetopic = "source"; // kafka 的队列名称
val targettopic = "target"; // kafka 的队列名称
// 建立消费者组
val group = "con-consumer-group"
// 消费者配置
val kafkaParam = Map(
"bootstrap.servers" -> brobrokers, // 用于初始化连接到集群的地址
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
// 用于标识这个消费者属于哪一个消费团体
"group.id" -> group,
// 若是没有初始化偏移量或者当前的偏移量不存在任何服务器上,可使用这个配置属性
// 可使用这个配置,latest 自动重置偏移量为最新的偏移量
"auto.offset.reset" -> "latest",
// 若是是 true,则这个消费者的偏移量会在后台自动提交
"enable.auto.commit" -> (false: java.lang.Boolean)
);
// ssc.sparkContext.broadcast(pool)
// 建立 DStream,返回接收到的输入数据
val stream = KafkaUtils.createDirectStream[String, String](ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(sourcetopic), kafkaParam))
// 每个 stream 都是一个 ConsumerRecord
stream.map(s => ("id:" + s.key(), ">>>>:" + s.value())).foreachRDD(rdd => {
rdd.foreachPartition(partitionOfRecords => {
// Get a producer from the shared pool
val pool = createKafkaProducerPool(brobrokers, targettopic)
val p = pool.borrowObject()
partitionOfRecords.foreach { message => System.out.println(message._2); p.send(message._2, Option(targettopic)) }
// Returning the producer to the pool also shuts it down
pool.returnObject(p)
})
})
ssc.start()
ssc.awaitTermination()
}
}
程序部署:
一、启动 zookeeper 集群和 kafka 集群。
[atguigu@hadoop102 zookeeper-3.4.10]$ pwd
/opt/module/zookeeper-3.4.10
[atguigu@hadoop102 zookeeper-3.4.10]$ /bin/zkServer.sh start
[atguigu@hadoop103 zookeeper-3.4.10]$ /bin/zkServer.sh start
[atguigu@hadoop104 zookeeper-3.4.10]$ /bin/zkServer.sh start
[atguigu@hadoop102 kafka]$ pwd
/opt/module/kafka
[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon ./config/server.properties
[atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon ./config/server.properties
[atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon ./config/server.properties
二、建立两个 topic,一个为 source,一个为 target
bin/kafka-topics.sh --create \
--zookeeper 192.168.25.102:2181,192.168.25.103:2181,192.168.25.104:2181 \
--replication-factor 2 \
--partitions 2 \
--topic source
bin/kafka-topics.sh --create \
--zookeeper 192.168.25.102:2181,192.168.25.103:2181,192.168.25.104:2181 \
--replication-factor 2 \
--partitions 2 \
--topic targe
三、启动 kafka console producer 写入 source topic
bin/kafka-console-producer.sh \
--broker-list 192.168.25.102:9092,192.168.25.103:9092,192.168.25.104:9092 \
--topic source
四、启动 kafka console consumer 监听 target topic
bin/kafka-console-consumer.sh \
--bootstrap-server 192.168.25.102:9092,192.168.25.103:9092,192.168.25.104:9092 \
--topic target
五、启动 kafka Streaming 程序
[atguigu@hadoop102 ~]$ /opt/module/spark-2.1.1-bin-hadoop2.7/bin/spark-submit \
--class com.atguigu.streaming.KafkaStreaming \
/opt/software/sparkjars/kafkastreaming-jar-with-dependencies.jar
六、程序运行截图
生产者
kafka 知识补充:
kafka 集群图解
Flume-ng
Spark 提供两个不一样的接收器来使用 Apache Flume(http://flume.apache.org)。
两个接收器简介以下。
• 推式接收器:该接收器以 Avro 数据池的方式工做,由 Flume 向其中推数据。
• 拉式接收器:该接收器能够从自定义的中间数据池中拉数据,而其余进程可使用 Flume 把数据推动该中间数据池。
两种方式都须要从新配置 Flume,并在某个节点配置的端口上运行接收器(不是已有的 Spark 或者 Flume 使用的端口)。要使用其中任何一种方法,都须要在工程中引入 Maven 工件 spark-streaming-flume_2.10。![]()
推式接收器的方法设置起来很容易,可是它不使用事务来接收数据。在这种方式中,接收器以 Avro 数据池的方式工做,咱们须要配置 Flume 来把数据发到 Avro 数据池。咱们提供的 FlumeUtils 对象会把接收器配置在一个特定的工做节点的主机名及端口号 上。这些设置必须和 Flume 配置相匹配。
![]()
虽然这种方式很简洁,但缺点是没有事务支持。这会增长运行接收器的工做节点发生错误 时丢失少许数据的概率。不只如此,若是运行接收器的工做节点发生故障,系统会尝试从 另外一个位置启动接收器,这时须要从新配置 Flume 才能将数据发给新的工做节点。这样配 置会比较麻烦。
较新的方式是拉式接收器(在Spark 1.1中引入),它设置了一个专用的Flume数据池供 Spark Streaming 读取,并让接收器主动从数据池中拉取数据。这种方式的优势在于弹性较 好,Spark Streaming 经过事务从数据池中读取并复制数据。在收到事务完成的通知前,这些数据还保留在数据池中。
咱们须要先把自定义数据池配置为 Flume 的第三方插件。安装插件的最新方法请参考 Flume 文档的相关部分(https://flume.apache.org/FlumeUserGuide.html#installing-third-party- plugins)。因为插件是用 Scala 写的,所以须要把插件自己以及 Scala 库都添加到 Flume 插件 中。Spark 1.1 中对应的 Maven 索引以下所示。
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume-sink_2.11</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.11</version>
</dependency>
当你把自定义 Flume 数据池添加到一个节点上以后,就须要配置 Flume 来把数据推送到这个数据池中。
a1.sinks = spark
a1.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.spark.hostname = receiver-hostname
a1.sinks.spark.port = port-used-for-sync-not-spark-port
a1.sinks.spark.channel = memoryChannel
等到数据已经在数据池中缓存起来,就能够调用 FlumeUtils 来读取数据了。
DStream 上的原语与 RDD 的相似,分为 Transformations(转换)和 Output Operations(输出)两种,此外转换操做中还有一些比较特殊的原语,如:updateStateByKey()、transform() 以及各类 Window 相关的原语。
DStream 的转化操做能够分为无状态(stateless)和有状态(stateful)两种。
• 在无状态转化操做中,每一个批次的处理不依赖于以前批次的数据。常见的 RDD 转化操做,例如 map()、filter()、reduceByKey() 等,都是无状态转化操做。
• 相对地,有状态转化操做须要使用以前批次的数据或者是中间结果来计算当前批次的数据。有状态转化操做包括基于滑动窗口的转化操做
和追踪状态变化的转化操做
。
![]()
无状态转化操做就是把简单的 RDD 转化操做应用到每一个批次上,也就是转化 DStream 中的每个 RDD。部分无状态转化操做列在了下表中。注意,针对键值对的 DStream 转化操做(好比 reduceByKey()) 要添加 import StreamingContext._ 才能在 Scala 中使用。
![]()
须要记住的是,尽管这些函数看起来像做用在整个流上同样,但
事实上每一个 DStream 在内部是由许多 RDD(批次)组成
,且无状态转化操做是分别应用到每一个 RDD 上的。例如,reduceByKey() 会归约每一个时间区间中的数据,但不会归约不一样区间之间的数据。
举个例子,在以前的 wordcount 程序中,咱们只会统计1秒内接收到的数据的单词个数,而不会累加。
无状态转化操做也能在多个 DStream 间整合数据,不过也是在各个时间区间内。例如,键值对 DStream 拥有和 RDD 同样的与链接相关的转化操做,也就是 cogroup()、join()、leftOuterJoin() 等。咱们能够在 DStream 上使用这些操做,这样就对每一个批次分别执行了对应的 RDD 操做。
咱们还能够像在常规的 Spark 中同样使用 DStream 的 union() 操做将它和另外一个 DStream 的内容合并起来,也可使用 StreamingContext.union() 来合并多个流。
特殊的 Transformations。
追踪状态变化 UpdateStateByKey
updateStateByKey 原语是
用于记录历史记录
,有时,咱们须要在 DStream 中跨批次维护状态(例如流计算中累加 wordcount)。针对这种状况,updateStateByKey() 为咱们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)
对构成的 DStream,并传递一个指定如何根据新的事件更新每一个键对应状态的函数,它能够构建出一个新的 DStream,其内部数据为(键,状态)
对。
updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每一个时间区间对应的 (键,状态) 对组成的。
updateStateByKey() 操做使得咱们能够在用新信息进行更新时保持任意的状态。为使用这个功能,你须要作下面两步:
• 1)定义状态,状态能够是一个任意的数据类型。
• 2)定义状态更新函数,用此函数阐明如何使用以前的状态和来自输入流的新值对状态进行更新。
使用 updateStateByKey 须要对检查点目录进行配置,会使用检查点来保存状态。
WordCount 第二版:
代码以下:
package com.atguigu.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WorldCount {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(3))
ssc.checkpoint("hdfs://192.168.25.102:9000/spark/checkpoints") // 设置一个检查点的目录
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("hadoop102", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" ")) // DStream[RDD[String]]
// import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1)) // 将每个单词映射成一个元组 (word,1)
// 定义更新状态方法,参数 values 为当前批次单词频度,state 为以往批次单词频度(该参数是由框架提供的)
val updateFunc = (values: Seq[Int], state: Option[Int]) => { // 匿名函数
// 计算当前批次相同 key 的单词总数
val currentCount = values.foldLeft(0)(_ + _)
// 获取上一次保存的单词总数
val previousCount = state.getOrElse(0)
// 返回新的单词总数
Some(currentCount + previousCount)
}
// 使用 updateStateByKey 来更新状态,统计从运行开始以来单词总的次数
val stateDstream = pairs.updateStateByKey[Int](updateFunc)
stateDstream.print()
// 以 text 文件形式存储这个 DStream 的内容。第一个参数是存储路径,第二个参数是文件的后缀名。
stateDstream.saveAsTextFiles("hdfs://192.168.25.102:9000/stateful", "abc")
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
// ssc.stop()
}
}
更新状态方法 updateFunc 图解以下:
测试:
先启动 netcat,再启动统计程序,再经过 netcat 发送测试数据
[atguigu@hadoop102 ~]$ nc -l -p 9999
hello hello china world #发送第一个 RRD
hello hello china china #发送第二个 RRD
启动统计程序
bin/spark-submit \
--class com.atguigu.streaming.WorldCount \
/opt/software/sparkjars/statefulwordcount-jar-with-dependencies.jar
[atguigu@hadoop102 spark-2.1.1-bin-hadoop2.7]$ pwd
/opt/module/spark-2.1.1-bin-hadoop2.7
[atguigu@hadoop102 spark-2.1.1-bin-hadoop2.7]$ bin/spark-submit \
> --class com.atguigu.streaming.WorldCount \
> /opt/software/sparkjars/statefulwordcount-jar-with-dependencies.jar
19/04/29 11:26:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
-------------------------------------------
Time: 1556508402000 ms
-------------------------------------------
19/04/29 11:26:44 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/04/29 11:26:44 WARN BlockManager: Block input-0-1556508404000 replicated to only 0 peer(s) instead of 1 peers
-------------------------------------------
Time: 1556508405000 ms
-------------------------------------------
(hello,2)
(world,1)
(china,1)
19/04/29 11:26:47 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/04/29 11:26:47 WARN BlockManager: Block input-0-1556508407400 replicated to only 0 peer(s) instead of 1 peers
-------------------------------------------
Time: 1556508408000 ms
-------------------------------------------
(hello,4)
(world,1)
(china,3)
-------------------------------------------
Time: 1556508411000 ms
-------------------------------------------
(hello,4)
(world,1)
(china,3)
在 HDFS 上查看检查点目录
Window Operations
Window Operations 有点相似于 Storm中 的 State,能够设置窗口的大小和滑动窗口的间隔来动态的获取当前 Steaming 的容许状态。基于窗口的操做会在一个比 StreamingContext 的批次间隔更长的时间范围内,经过整合多个批次的结果,计算出整个窗口的结果。
![]()
全部基于窗口的操做都须要两个参数,分别为
窗口时长
以及滑动步长
,二者都必须是 StreamContext 的批次间隔的整数倍
。窗口时长控制每次计算最近的多少个批次的数据,其实就是最近的 windowDuration/batchInterval 个批次,以下图所示。若是有一个以 10 秒为批次间隔的源 DStream,要建立一个最近 30 秒的时间窗口(即最近 3 个批次),就应当把 windowDuration 设为 30 秒。而滑动步长的默认值与批次间隔相等,用来控制对新的 DStream 进行计算的间隔。若是源 DStream 批次间隔为 10 秒,而且咱们只但愿每两个批次计算一次窗口结果,就应该把滑动步长设置为 20 秒。假设,你想拓展前例从而每隔十秒对持续 30 秒的数据生成 wordcount。为作到这个,咱们须要在持续 30 秒数据的 (word,1) 对 DStream上应用 reduceByKey。![]()
使用操做 reduceByKeyAndWindow:
# reduce last 30 seconds of data, every 10 second
windowedWordCounts = pairs.reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y: x -y, 30, 20)
Window Operations 经常使用函数
![]()
reduceByWindow() 和 reduceByKeyAndWindow() 让咱们能够对每一个窗口更高效地进行归约操做
。它们接收一个归约函数,在整个窗口上执行,好比 +。除此之外,它们还有一种特殊形式,经过只考虑新进入窗口的数据和离开窗 口的数据,让 Spark 增量计算归约结果。这种特殊形式须要提供归约函数的一个逆函数
,好比 + 对应的逆函数为 -。对于较大的窗口,提供逆函数能够大大提升执行效率。以下图所示:
![]()
示例代码:
val ipDStream = accessLogsDStream.map(logEntry => (logEntry.getIpAddress(), 1))
val ipCountDStream = ipDStream.reduceByKeyAndWindow(
{(x, y) => x + y},
{(x, y) => x - y},
Seconds(30),
Seconds(10))
// 加上新进入窗口的批次中的元素 // 移除离开窗口的老批次中的元素 // 窗口时长
// 滑动步长
countByWindow() 和 countByValueAndWindow() 做为对数据进行计数操做的简写。countByWindow() 返回一个表示每一个窗口中元素个数的 DStream,而 countByValueAndWindow() 返回的 DStream 则包含窗口中每一个值的个数。
val ipDStream = accessLogsDStream.map{entry => entry.getIpAddress()}
val ipAddressRequestCount = ipDStream.countByValueAndWindow(Seconds(30), Seconds(10))
val requestCount = accessLogsDStream.countByWindow(Seconds(30), Seconds(10))
WordCount 第三版:3 秒一个批次,窗口 12 秒,滑步 6 秒。
package com.atguigu.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object WorldCount {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(3))
ssc.checkpoint("hdfs://192.168.25.102:9000/spark/checkpoints") // 设置一个检查点的目录
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("hadoop102", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
//import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
// 3 秒一个批次,窗口 12 秒,会有 12 / 3 = 4 个批次
// 滑动步长 6 秒,会有 6 / 3 = 2 个批次
val wordCounts = pairs.reduceByKeyAndWindow((a: Int, b: Int) => (a + b), Seconds(12), Seconds(6))
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
// ssc.stop()
}
}
测试:
先启动 netcat,再启动统计程序,再经过 netcat 发送测试数据
[atguigu@hadoop102 ~]$ nc -l -p 9999
hello hello china world #发送第一个 RRD
hello hello china china #发送第二个 RRD
启动统计程序
bin/spark-submit \
--class com.atguigu.streaming.WorldCount \
/opt/software/sparkjars/windowwordcount-jar-with-dependencies.jar
Transform Operation
Transform 原语容许 DStream 上执行任意的 RDD-to-RDD 函数。即便这些函数并无在 DStream 的 API 中暴露出来,经过该函数能够方便的扩展 Spark API。
该函数每一批次调度一次。
好比下面的例子,在进行单词统计的时候,想要过滤掉 spam 的信息。
其实也就是对 DStream 中的 RDD 应用转换。
val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // RDD containing spam information
val cleanedDStream = wordCounts.transform { rdd =>
rdd.join(spamInfoRDD).filter(...) // join data stream with spam information to do data cleaning
...
}
Join 操做
链接操做(leftOuterJoin, rightOuterJoin, fullOuterJoin 也能够),能够链接 stream-stream,windows-stream to windows-stream、stream-dataset
1)stream-stream joins
val stream1: DStream[String, String] = ...
val stream2: DStream[String, String] = ...
val joinedStream = stream1.join(stream2)
val windowedStream1 = stream1.window(Seconds(20))
val windowedStream2 = stream2.window(Minutes(1))
val joinedStream = windowedStream1.join(windowedStream2)
2)stream-dataset joins
val dataset: RDD[String, String] = ...
val windowedStream = stream.window(Seconds(20))...
val joinedStream = windowedStream.transform { rdd => rdd.join(dataset) }
输出操做指定了对流数据经转化操做获得的数据所要执行的操做(例如把结果推入外部数据库或输出到屏幕上)。与 RDD 中的惰性求值相似,若是一个 DStream 及其派生出的 DStream 都没有被执行输出操做,那么这些 DStream 就都不会被求值。若是 StreamingContext 中没有设定输出操做,那么整个 context 就都不会启动。
![]()
通用的输出操做 foreachRDD(),它用来对 DStream 中的 RDD 运行任意计算。这和transform() 有些相似,均可以让咱们访问任意 RDD。在 foreachRDD() 中,能够重用咱们在 Spark 中实现的全部行动操做。好比,常见的用例之一是把数据写到诸如 MySQL 的外部数据库中。
须要注意的:
• 1)链接不能写在 driver 层面。
• 2)若是写在 foreach 中则每一个 RDD 都建立,得不偿失。
• 3)增长 foreachPartition,在分区建立。
• 4)能够考虑使用链接池优化。
dstream.foreachRDD { rdd =>
// error val connection = createNewConnection() // executed at the driver 序列化错误
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record) // executed at the worker
)
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
累加器(Accumulators)和广播变量(Broadcast variables)不能从 Spark Streaming 的检查点中恢复
。若是你启用检查并也使用了累加器和广播变量,那么你必须建立累加器和广播变量的延迟单实例从而在驱动因失效重启后他们能够被从新实例化
。以下例述:
object WordBlacklist {
@volatile private var instance: Broadcast[Seq[String]] = null
def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
if (instance == null) {
synchronized {
if (instance == null) {
val wordBlacklist = Seq("a", "b", "c")
instance = sc.broadcast(wordBlacklist)
}
}
}
instance
}
}
object DroppedWordsCounter {
@volatile private var instance: LongAccumulator = null
def getInstance(sc: SparkContext): LongAccumulator = {
if (instance == null) {
synchronized {
if (instance == null) {
instance = sc.longAccumulator("WordsInBlacklistCounter")
}
}
}
instance
}
}
wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
// Get or register the blacklist Broadcast
val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
// Get or register the droppedWordsCounter Accumulator
val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
// Use blacklist to drop words and use droppedWordsCounter to count them
val counts = rdd.filter { case (word, count) =>
if (blacklist.value.contains(word)) {
droppedWordsCounter.add(count)
false
} else {
true
}
}.collect().mkString("[", ", ", "]")
val output = "Counts at time " + time + " " + counts
})
你能够很容易地在流数据上使用 DataFrames 和 SQL。你必须使用 SparkContext 来建立 StreamingContext 要用的 SQLContext。此外,这一过程能够在驱动失效后重启。咱们经过建立一个实例化的 SQLContext 单实例来实现这个工做。以下例所示。咱们对前例 wordcount 进行修改从而使用 DataFrames 和 SQL 来产生 wordcounts。每一个 RDD 被转换为 DataFrame,以临时表格配置并用 SQL 进行查询。
val words: DStream[String] = ...
words.foreachRDD { rdd =>
// Get the singleton instance of SparkSession
val spark = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate()
import spark.implicits._
// Convert RDD[String] to DataFrame
val wordsDataFrame = rdd.toDF("word")
// Create a temporary view
wordsDataFrame.createOrReplaceTempView("words")
// Do word count on DataFrame using SQL and print it
val wordCountsDataFrame =
spark.sql("select word, count(*) as total from words group by word")
wordCountsDataFrame.show()
}
你也能够从不一样的线程在定义流数据的表上运行 SQL 查询(也就是说,异步运行 StreamingContext)。仅肯定你设置 StreamingContext 记住了足够数量的流数据以使得查询操做能够运行。不然,StreamingContext 不会意识到任何异步的 SQL 查询操做,那么其就会在查询完成以后删除旧的数据。例如,若是你要查询最后一批次,可是你的查询会运行 5 分钟,那么你须要调用 streamingContext.remember(Minutes(5))
(in Scala 或者其余语言的等价操做)。
和 RDDs 相似,DStream 一样容许开发者将流数据保存在内存中。也就是说,在 DStream 上使用 persist() 方法将会自动把 DStream 中的每一个 RDD 保存在内存中。当 DStream 中的数据要被屡次计算时,这个很是有用(如在一样数据上的屡次操做)。
对于像 reduceByWindow 和 reduceByKeyAndWindow 以及基于状态的 (updateStateByKey) 这种操做,保存在内存中是隐含默认的
。所以,即便开发者没有调用 persist(),由基于窗操做产生的 DStream 会自动保存在内存中
。
检查点机制是咱们在 Spark Streaming 中用来
保障容错性
的主要机制。与应用程序逻辑无关的错误(即系统错位,JVM 崩溃等)有迅速恢复的能力。
它可使 Spark Streaming 阶段性地把应用数据存储到诸如 HDFS 或 Amazon S3 这样的可靠存储系统中,以供恢复时使用。具体来讲,检查点机制主要为如下两个目的服务:
• 1)控制发生失败时须要重算的状态数。SparkStreaming 能够经过转化图的谱系图来重算状态,检查点机制则能够控制须要在转化图中回溯多远。
• 2)提供驱动器程序容错。若是流计算应用中的驱动器程序崩溃了,你能够重启驱动器程序并让驱动器程序从检查点恢复,这样 Spark Streaming 就能够读取以前运行的程序处理数据的进度,并从那里继续。为了实现这个,Spark Streaming 须要为容错存储系统 checkpoint 提供足够的信息从而使得其能够从失败中恢复过来。有两种类型的数据设置检查点:
Metadata checkpointing:将定义流计算的信息存入容错的系统如 HDFS。元数据包括:
配置 – 用于建立流应用的配置。
DStreams 操做 – 定义流应用的 DStreams 操做集合。
不完整批次 – 批次的工做已进行排队可是并未完成。Data checkpointing:将产生的 RDDs 存入可靠的存储空间。对于在多批次间合并数据的状态转换,这个颇有必要。在这样的转换中,RDDs 的产生基于以前批次的 RDDs,这样依赖链长度随着时间递增。为了不在恢复期这种无限的时间增加(和链长度成比例),状态转换中间的 RDDs 周期性写入可靠地存储空间(如 HDFS)从而切短依赖链。
总而言之,
元数据检查点在由驱动失效中恢复是首要须要的
。而数据或者 RDD 检查点甚至在使用了状态转换的基础函数中也是必要的。
出于这些缘由,检查点机制对于任何生产环境中的流计算应用都相当重要。你能够经过向 ssc.checkpoint() 方法传递一个路径参数 (HDFS、S3 或者本地路径都可) 来配置检查点机制,同时你的应用应该可以使用检查点的数据。
• 1)当程序首次启动,其将建立一个新的 StreamingContext,设置全部的流并调用 start()。
• 2)当程序在失效后重启,其将依据检查点目录的检查点数据从新建立一个 StreamingContext。经过使用 StraemingContext.getOrCreate 很容易得到这个性能。
ssc.checkpoint("hdfs://...")
# 建立和设置一个新的 StreamingContext
def functionToCreateContext():
sc = SparkContext(...) # new context
ssc = new StreamingContext(...)
lines = ssc.socketTextStream(...) # create DStreams
...
ssc.checkpoint(checkpointDirectory) # 设置检查点目录
return ssc
# 从检查点数据中获取 StreamingContext 或者从新建立一个
context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext)
# 在须要完成的 context 上作额外的配置
# 不管其有没有启动
context ...
# 启动 context
context.start()
contaxt.awaitTermination()
若是检查点目录(checkpointDirectory)存在,那么 context 将会由检查点数据从新建立。若是目录不存在(首次运行),那么函数 functionToCreateContext 将会被调用来建立一个新的 context 并设置 DStreams。
注意
:RDDs 的检查点引发存入可靠内存的开销。在 RDDs 须要检查点的批次里,处理的时间会所以而延长。因此,检查点的间隔须要很仔细地设置。在小尺寸批次(1秒钟)。每一批次检查点会显著减小操做吞吐量。反之,检查点设置的过于频繁致使“血统”和任务尺寸增加,这会有很很差的影响。对于须要 RDD 检查点设置的状态转换,默认间隔是批次间隔的乘数通常至少为 10 秒钟。能够经过 dstream.checkpoint(checkpointInterval)。一般,检查点设置间隔是 5-10 个 DStream 的滑动间隔。
驱动器程序的容错要求咱们以特殊的方式建立 StreamingContext。咱们须要把检查点目录提供给 StreamingContext。与直接调用 new StreamingContext 不一样,应该使用 StreamingContext.getOrCreate() 函数。
为了应对工做节点失败的问题,Spark Streaming 使用与 Spark 的容错机制相同的方法。全部从外部数据源中收到的数据都在多个工做节点上备份。全部从备份数据转化操做的过程当中建立出来的 RDD 都能容忍一个工做节点的失败,由于
根据 RDD 谱系图,系统能够把丢失的数据从幸存的输入数据备份中重算出来
。
运行接收器的工做节点的容错也是很重要的。若是这样的节点发生错误,Spark Streaming 会在集群中别的节点上重启失败的接收器。然而,这种状况会不会致使数据的丢失
取决于数据源的行为(数据源是否会重发数据)
以及接收器的实现(接收器是否会向数据源确认收到数据)
。举个例子,使用 Flume 做为数据源时,两种接收器的主要区别在于数据丢失时的保障。在 “接收器从数据池中拉取数据” 的模型中,Spark 只会在数据已经在集群中备份时才会从数据池中移除元素。而在 “向接收器推数据” 的模型中,若是接收器在数据备份以前失败,一些数据可能就会丢失。总的来讲,对于任意一个接收器,必须同时考虑上游数据源的容错性(是否支持事务)来确保零数据丢失。
总的来讲,接收器提供如下保证:
• 全部从可靠文件系统中读取的数据 (好比经过 StreamingContext.hadoopFiles 读取的) 都是可靠的,由于底层的文件系统是有备份的。Spark Streaming 会记住哪些数据存放到了检查点中,并在应用崩溃后从检查点处继续执行。
• 对于像 Kafka、推式 Flume、Twitter 这样的不可靠数据源,Spark 会把输入数据复制到其余节点上,可是若是接收器任务崩溃,Spark 仍是会丢失数据。在 Spark 1.1 以及更早的版本中,收到的数据只被备份到执行器进程的内存中,因此一旦驱动器程序崩溃(此时全部的执行器进程都会丢失链接),数据也会丢失。在 Spark 1.2 中,收到的数据被记录到诸如 HDFS 这样的可靠的文件系统中
,这样即便驱动器程序重启也不会致使数据丢失。
综上所述,确保全部数据都被处理的最佳方式是使用可靠的数据源(例如 HDFS、拉式 Flume 等)。若是你还要在批处理做业中处理这些数据,使用可靠数据源是最佳方式,由于这种方式确保了你的批处理做业
和流计算做业
能读取到相同的数据,于是能够获得相同的结果。
因为 Spark Streaming 工做节点的容错保障,Spark Streaming 能够为全部的转化操做提供 “精确一次” 执行的语义,即便一个工做节点在处理部分数据时发生失败,最终的转化结果(即转化操做获得的 RDD)仍然与数据只被处理一次获得的结果同样。
然而,当把转化操做获得的结果使用输出操做推入外部系统中时,写结果的任务可能因故障而执行屡次,一些数据可能也就被写了屡次。因为这引入了外部系统,所以咱们须要专门针对各系统的代码来处理这样的状况。咱们能够使用事务操做来写入外部系统
(即原子化地将一个 RDD 分区一次写入),或者设计幂等的更新操做
(即屡次运行同一个更新操做仍生成相同的结果)。好比 Spark Streaming 的 saveAs…File 操做会在一个文件写完时自动将其原子化地移动到最终位置上,以此确保每一个输出文件只存在一份。
最多见的问题是 Spark Streaming 可使用的最小批次间隔是多少。总的来讲,500 毫秒已经被证明为对许多应用而言是比较好的最小批次大小。寻找最小批次大小的最佳实践是从一个比较大的批次大小(10 秒左右)开始,不断使用更小的批次大小。若是 Streaming 用户界面中显示的处理时间保持不变,你就能够进一步减少批次大小。若是处理时间开始增长,你可能已经达到了应用的极限。
类似地,对于窗口操做,计算结果的间隔(也就是滑动步长)对于性能也有巨大的影响。当计算代价巨大并成为系统瓶颈时,就应该考虑提升滑动步长了。
减小批处理所消耗时间的常见方式还有提升并行度
。有如下三种方式能够提升并行度:
•增长接收器数目
。有时若是记录太多会致使单台机器来不及读入并分发的话,接收器会成为系统瓶颈。这时你就须要经过建立多个输入 DStream(这样会建立多个接收器) 来增长接收器数目,而后使用 union 来把数据合并为一个数据源。
•将收到的数据显式地从新分区
。若是接收器数目没法再增长,你能够经过使用 DStream.repartition 来显式从新分区输入流(或者合并多个流获得的数据流) 来从新分配收到的数据。
•提升聚合计算的并行度
。对于像 reduceByKey() 这样的操做,你能够在第二个参数中指定并行度,咱们在介绍 RDD 时提到过相似的手段。