虽然SparkStreaming已经中止更新,Spark的重点也放到了 Structured Streaming ,但因为Spark版本太低或者其余技术选型问题,可能仍是会选择SparkStreaming。
SparkStreaming对于时间窗口,事件时间虽然支撑较少,但仍是能够知足部分的实时计算场景的,SparkStreaming资料较多,这里也作一个简单介绍。html
Spark Streaming在当时是为了与当时的Apache Storm竞争,也让Spark能够用于流式数据的处理。根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强等特色。Spark Streaming支持的数据输入源不少,例如:Kafka、Flume、Twitter、ZeroMQ和简单的TCP套接字等等。数据输入后能够用Spark的高度抽象原语如:map、reduce、join、window等进行运算。而结果也能保存在不少地方,如HDFS,数据库等。另外Spark Streaming也能和MLlib(机器学习)以及Graphx完美融合。
固然Storm目前已经渐渐淡出,Flink开始大放异彩。java
Spark Streaming 是 Spark Core API 的扩展,它支持弹性的,高吞吐的,容错的实时数据流的处理。数据能够经过多种数据源获取,例如 Kafka,Flume,Kinesis 以及 TCP sockets,也能够经过例如 map
,reduce
,join
,window
等的高级函数组成的复杂算法处理。最终,处理后的数据能够输出到文件系统,数据库以及实时仪表盘中。事实上,你还能够在 data streams(数据流)上使用 [机器学习] 以及 [图计算] 算法。
在内部,它工做原理以下,Spark Streaming 接收实时输入数据流并将数据切分红多个 batch(批)数据,而后由 Spark 引擎处理它们以生成最终的 stream of results in batches(分批流结果)。redis
Spark Streaming 提供了一个名为 discretized stream 或 DStream 的高级抽象,它表明一个连续的数据流。DStream 能够从数据源的输入数据流建立,例如 Kafka,Flume 以及 Kinesis,或者在其余 DStream 上进行高层次的操做以建立。在内部,一个 DStream 是经过一系列的 [RDDs] 来表示。算法
本指南告诉你如何使用 DStream 来编写一个 Spark Streaming 程序。你可使用 Scala,Java 或者 Python(Spark 1.2 版本后引进)来编写 Spark Streaming 程序。数据库
在idea中新建maven项目apache
引入依赖bootstrap
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.4.4</version> </dependency>
Project Structure —— Global Libraries —— 把scala 添加到 add module设计模式
新建Scala Classapi
import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} object Demo { //屏蔽日志 Logger.getLogger("org.apache")setLevel(Level.WARN) def main(args: Array[String]): Unit = { //local会有问题 最少两个线程 一个拿数据 一个计算 //val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local") val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local[2]") //时间间隔 val ssc = new StreamingContext(conf,Seconds(1)) //接收数据 处理 //socket demo val value: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) val words: DStream[String] = value.flatMap(_.split(" ")) val wordsTuple: DStream[(String, Int)] = words.map((_, 1)) val wordcount: DStream[(String, Int)] = wordsTuple.reduceByKey(_ + _) //触发action wordcount.print() ssc.start() //保持流的运行 等待程序被终止 ssc.awaitTermination() } }
测试数组
下载一个win10 用的netcat
https://eternallybored.org/misc/netcat/
解压 在目录下启动cmd
输入
nc -L -p 9999
开始输入单词 在idea中验证接收
为了初始化一个 Spark Streaming 程序,一个 StreamingContext 对象必需要被建立出来,它是全部的 Spark Streaming 功能的主入口点。
import org.apache.spark._ import org.apache.spark.streaming._ val conf = new SparkConf().setAppName(appName).setMaster(master) val ssc = new StreamingContext(conf, Seconds(1))
appName
参数是展现在集群 UI 界面上的应用程序的名称
master
是local 或者spark集群的url(mesos yarn)
本地测试能够用local[*] 注意要多于两个线程
Second(1)定义的是batch interval 批处理间隔 就是间隔多久去拿一次数据
在定义一个 context 以后,您必须执行如下操做。
streamingContext.start()
来处理数据。streamingContext.awaitTermination()
等待处理被终止(手动或者因为任何错误)。streamingContext.stop()
来手动的中止处理。须要记住的几点:
stop()
的可选参数,名叫 stopSparkContext
为 false。Discretized Stream or DStream 是 Spark Streaming 提供的基本抽象。它表明了一个连续的数据流。多是数据源接收的流,也多是转换后的流。
DStream就是多个和时间相关的一系列连续RDD的集合,好比本例就是间隔一秒的一堆RDD的集合
DStream也是有依赖关系的
flatMap 操做也是直接做用在DStream上的,就和做用于RDD同样 这样很好理解
咱们先来看数据源接收的流 这种叫作Input DStreams 他会经过Receivers接收器去不一样的数据源接收数据。
Spark Streaming内置了两种数据源:
注意本地运行时,不要用local或者local[1],一个线程不够。放到集群上时分配给SparkStreaming的核数必须大于接收器的数量,留一个核去处理数据。
咱们也能够自定义数据源,那咱们就须要本身开发一个接收器。
在咱们接收到Dstreams以后能够进行转换操做,常见转换以下:
Transformation(转换) | Meaning(含义) |
---|---|
map(func) | 利用函数 func 处理原 DStream 的每一个元素,返回一个新的 DStream。 |
flatMap(func) | 与 map 类似,可是每一个输入项可用被映射为 0 个或者多个输出项。。 |
filter(func) | 返回一个新的 DStream,它仅仅包含原 DStream 中函数 func 返回值为 true 的项。 |
repartition(numPartitions) | 经过建立更多或者更少的 partition 以改变这个 DStream 的并行级别(level of parallelism)。 |
union(otherStream) | 返回一个新的 DStream,它包含源 DStream 和 otherDStream 的全部元素。 |
count() | 经过 count 源 DStream 中每一个 RDD 的元素数量,返回一个包含单元素(single-element)RDDs 的新 DStream。 |
reduce(func) | 利用函数 func 汇集源 DStream 中每一个 RDD 的元素,返回一个包含单元素(single-element)RDDs 的新 DStream。函数应该是相关联的,以使计算能够并行化。 |
countByValue() | 在元素类型为 K 的 DStream上,返回一个(K,long)pair 的新的 DStream,每一个 key 的值是在原 DStream 的每一个 RDD 中的次数。 |
reduceByKey(func, [numTasks]) | 当在一个由 (K,V) pairs 组成的 DStream 上调用这个算子时,返回一个新的,由 (K,V) pairs 组成的 DStream,每个 key 的值均由给定的 reduce 函数聚合起来。注意:在默认状况下,这个算子利用了 Spark 默认的并发任务数去分组。你能够用 numTasks 参数设置不一样的任务数。 |
join(otherStream, [numTasks]) | 当应用于两个 DStream(一个包含(K,V)对,一个包含 (K,W) 对),返回一个包含 (K, (V, W)) 对的新 DStream。 |
cogroup(otherStream, [numTasks]) | 当应用于两个 DStream(一个包含(K,V)对,一个包含 (K,W) 对),返回一个包含 (K, Seq[V], Seq[W]) 的 tuples(元组)。 |
transform(func) | 经过对源 DStream 的每一个 RDD 应用 RDD-to-RDD 函数,建立一个新的 DStream。这个能够在 DStream 中的任何 RDD 操做中使用。 |
updateStateByKey(func) | 返回一个新的 "状态" 的 DStream,其中每一个 key 的状态经过在 key 的先前状态应用给定的函数和 key 的新 valyes 来更新。这能够用于维护每一个 key 的任意状态数据。 |
这里咱们特别介绍一下updateStateByKey
咱们若是须要对历史数据进行统计,可能须要去kafka里拿一下以前留存的数据,也能够用updateStateByKey这个方法。
//保存状态 聚合相同的单词 val wordcount = wordsTuple.updateStateByKey[Int]( //updateFunction _ (newValues: Seq[Int], runningCount: Option[Int])=> { val newCount = Some(newValues.sum + runningCount.getOrElse(0)) newCount } )
好比刚才的单词计数,咱们只能统计每一次发过来的消息,可是若是但愿统计屡次消息就须要用到这个,咱们要指定一个checkpoint,就是从哪开始算。
//增长成员变量 val checkpointDir = "./ckp" //在方法中加入checkpoint ssc.checkpoint(checkpointDir) val value: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) value.checkpoint(Seconds(4))//官方建议批次时间的1-5倍
这时候咱们创建StreamingContext的方法就要改变了 咱们把刚才的建立过程提取成方法。
def creatingFunc():StreamingContext = { val conf = new SparkConf().setAppName(s"${this.getClass.getSimpleName}").setMaster("local[*]") val ssc = new StreamingContext(conf, Seconds(1)) ssc.checkpoint(checkpointDir) val value: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999) value.checkpoint(Seconds(4))//官方建议批次时间的1-5倍 val words: DStream[String] = value.flatMap(_.split(" ")) val wordsTuple: DStream[(String, Int)] = words.map((_, 1)) //保存状态 聚合相同的单词 val wordcount = wordsTuple.updateStateByKey[Int]( //updateFunction _ (newValues: Seq[Int], runningCount: Option[Int])=> { val newCount = Some(newValues.sum + runningCount.getOrElse(0)) newCount } ) //触发action wordcount.print() ssc }
在mian函数中修改成:
def main(args: Array[String]): Unit = { val ssc = StreamingContext.getOrCreate(checkpointDir,creatingFunc _) ssc.start() //保持流的运行 等待程序被终止 ssc.awaitTermination() }
这样就是,若是有checkpoint,程序会在checkpoint中把程序加载回来(程序被保存为二进制),没有checkpoint的话才会建立。
将目录下的checkpoint删除,就能够将状态删除。
生产中updateStateByKey因为会将数据备份要慎重使用,能够考虑用hbase,redis等作替代。或者借助kafka作聚合处理。
//若是不用updatestateByKey 能够考虑redis wordsTuple.foreachRDD(rdd => { rdd.foreachPartition(i => { //redis } ) })
Spark Streaming 也支持 _windowed computations(窗口计算),它容许你在数据的一个滑动窗口上应用 transformation(转换)。
如上图显示,窗口在源 DStream 上 _slides(滑动),任何一个窗口操做都须要指定两个参数:
好比计算过去30秒的词频:
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
一些经常使用的窗口操做以下所示,这些操做都须要用到上文提到的两个参数 - windowLength(窗口长度) 和 slideInterval(滑动的时间间隔)。
Transformation(转换) | Meaning(含义) |
---|---|
window(windowLength, slideInterval) | 返回一个新的 DStream,它是基于 source DStream 的窗口 batch 进行计算的。 |
countByWindow(windowLength, slideInterval) | 返回 stream(流)中滑动窗口元素的数 |
reduceByWindow(func, windowLength, slideInterval) | 返回一个新的单元素 stream(流),它经过在一个滑动间隔的 stream 中使用 func 来聚合以建立。该函数应该是 associative(关联的)且 commutative(可交换的),以便它能够并行计算 |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | 在一个 (K, V) pairs 的 DStream 上调用时,返回一个新的 (K, V) pairs 的 Stream,其中的每一个 key 的 values 是在滑动窗口上的 batch 使用给定的函数 func 来聚合产生的。Note(注意): 默认状况下,该操做使用 Spark 的默认并行任务数量(local model 是 2,在 cluster mode 中的数量经过 spark.default.parallelism 来肯定)来作 grouping。您能够经过一个可选的 numTasks 参数来设置一个不一样的 tasks(任务)数量。 |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) | 上述 reduceByKeyAndWindow() 的更有效的一个版本,其中使用前一窗口的 reduce 值逐渐计算每一个窗口的 reduce值。这是经过减小进入滑动窗口的新数据,以及 “inverse reducing(逆减)” 离开窗口的旧数据来完成的。一个例子是当窗口滑动时”添加” 和 “减” keys 的数量。然而,它仅适用于 “invertible reduce functions(可逆减小函数)”,即具备相应 “inverse reduce(反向减小)” 函数的 reduce 函数(做为参数 invFunc </ i>)。像在 reduceByKeyAndWindow 中的那样,reduce 任务的数量能够经过可选参数进行配置。请注意,针对该操做的使用必须启用 checkpointing. |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | 在一个 (K, V) pairs 的 DStream 上调用时,返回一个新的 (K, Long) pairs 的 DStream,其中每一个 key 的 value 是它在一个滑动窗口以内的频次。像 code>reduceByKeyAndWindow 中的那样,reduce 任务的数量能够经过可选参数进行配置。 |
在 Spark Streaming 中能够执行不一样类型的 join
val stream1: DStream[String, String] = ... val stream2: DStream[String, String] = ... val joinedStream = stream1.join(stream2) //也能够用窗口 val windowedStream1 = stream1.window(Seconds(20)) val windowedStream2 = stream2.window(Minutes(1)) val joinedStream = windowedStream1.join(windowedStream2)
输出操做容许将 DStream 的数据推送到外部系统,如数据库或文件系统。
会触发全部变换的执行,相似RDD的action操做。有以下操做:
Output Operation | Meaning |
---|---|
print() | 在运行流应用程序的 driver 节点上的DStream中打印每批数据的前十个元素。这对于开发和调试颇有用。 |
Python API 这在 Python API 中称为 pprint()。 | |
saveAsTextFiles(prefix, [suffix]) | 将此 DStream 的内容另存为文本文件。每一个批处理间隔的文件名是根据 前缀 和 后缀_:"prefix-TIME_IN_MS[.suffix]"_ 生成的。 |
saveAsObjectFiles(prefix, [suffix]) | 将此 DStream 的内容另存为序列化 Java 对象的 SequenceFiles 。每一个批处理间隔的文件名是根据 前缀 和 后缀_:"prefix-TIME_IN_MS[.suffix]"_ 生成的。 |
Python API 这在Python API中是不可用的。 | |
saveAsHadoopFiles(prefix, [suffix]) | 将此 DStream 的内容另存为 Hadoop 文件。每一个批处理间隔的文件名是根据 前缀 和 后缀_:"prefix-TIME_IN_MS[.suffix]"_ 生成的。 |
Python API 这在Python API中是不可用的。 | |
foreachRDD(func) | 对从流中生成的每一个 RDD 应用函数 func 的最通用的输出运算符。此功能应将每一个 RDD 中的数据推送到外部系统,例如将 RDD 保存到文件,或将其经过网络写入数据库。请注意,函数 func 在运行流应用程序的 driver 进程中执行,一般会在其中具备 RDD 动做,这将强制流式传输 RDD 的计算。 |
dstream.foreachRDD容许将数据发送到外部系统。
但咱们不要每次都建立一个链接,解决方案以下:
减小开销,分区分摊开销
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() } }
更好的作法是用静态资源池:
dstream.foreachRDD { rdd => rdd.foreachPartition { partitionOfRecords => // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse } }
Apache Kafka是一个高性能的消息系统,由Scala 写成。是由Apache 软件基金会开发的一个开源消息系统项目。
Kafka 最初是由LinkedIn 开发,并于2011 年初开源。2012 年10 月从Apache Incubator 毕业。该项目的目标是为处理实时数据提供一个统1、高通量、低等待(低延时)的平台。
更多kafka相关请查看Kafka入门宝典(详细截图版)
Spark Streaming 2.4.4兼容 kafka 0.10.0 或者更高的版本
Spark Streaming在2.3.0版本以前是提供了对kafka 0.8 和 0.10的支持的 ,不过在2.3.0之后对0.8的支持取消了。
Note: Kafka 0.8 support is deprecated as of Spark 2.3.0.
spark-streaming-kafka-0-8 | spark-streaming-kafka-0-10 | |
---|---|---|
Broker Version | 0.8.2.1 or higher | 0.10.0 or higher |
API Maturity | Deprecated | Stable |
Language Support | Scala, Java, Python | Scala, Java |
Receiver DStream | Yes | No |
Direct DStream | Yes | Yes |
SSL / TLS Support | No | Yes |
Offset Commit API | No | Yes |
Dynamic Topic Subscription | No | Yes |
这里简单介绍一下对kafka0.8的一种支持方式:基于Receiver
依赖:
groupId = org.apache.spark artifactId = spark-streaming-kafka-0-8_2.12 version = 2.4.4
import org.apache.spark.streaming.kafka._ val kafkaStream = KafkaUtils.createStream(streamingContext, [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
这种状况 程序停掉数据会丢失,为了避免丢失本身又写了一份,这种是不少余的。
因为采用了kafka高阶api,偏移量offset不可控。
Kafka 0.10.0版本之后,采用了更好的一种Direct方式,这种咱们须要本身维护偏移量offset。
直连方式 并行度会更高 生产环境用的最多,0.8版本须要在zk或者redis等地方本身维护偏移量。咱们使用0.10以上版本支持本身设置偏移量,咱们只须要本身将偏移量写回kafka就能够。
依赖
groupId = org.apache.spark artifactId = spark-streaming-kafka-0-10_2.12 version = 2.4.4
kafka 0.10之后 能够将offset写回kafka 咱们不须要本身维护offset了,具体代码以下:
val conf = new SparkConf().setAppName("KafkaStreaming").setMaster("local[*]") val ssc = new StreamingContext(conf,Seconds(2)) val kafkaParams = Map[String, Object]( "bootstrap.servers" -> "localhost:9092,anotherhost:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", //latest none earliest "auto.offset.reset" -> "earliest", //自动提交偏移量 false "enable.auto.commit" -> (false: java.lang.Boolean) ) //val topics = Array("topicA", "topicB") val topics = Array("test_topic") val stream = KafkaUtils.createDirectStream[String, String]( ssc, // 与kafka broker不在一个节点上 用不一样策略 //在一个节点用 PreferBrokers策略 不多见 LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, kafkaParams) ) stream.foreachRDD(rdd => { //普通的RDD不能强转HasOffsetRanges 但kafkaRDD有 with这个特性 能够强转 val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges //处理数据 计算逻辑 rdd.foreachPartition { iter => //一次处理一个分区的数据 获取这个分区的偏移量 //计算完之后修改偏移量 要开启事务 相似数据库 connection -> conn.setAutoCommit(false) 各类操做 conn.commit(); conn.rollback() //获取偏移量 若是要本身记录的话这个 //val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) //println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") //处理数据 iter.foreach(println) } //kafka 0.10新特性 处理完数据后 将偏移量写回kafka // some time later, after outputs have completed //kafka有一个特殊的topic 保存偏移量 stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) })
更多Flink,Kafka,Spark等相关技术博文,科技资讯,欢迎关注实时流式计算 公众号后台回复 “电子书” 下载300页Flink实战电子书