Spark Streaming 是核心Spark API的扩展,可实现实时数据流的可伸缩,高吞吐量,容错流处理。能够从许多数据源(例如Kafka,Flume,Kinesis或TCP sockets)中提取数据,而且可使用复杂的算法处理数据,这些算法用高级函数表示,如map、reduce、join和window。最后,能够将处理后的数据推送到文件系统,数据库和实时仪表板。实际上,能够在数据流上应用Spark的机器学习和图形处理算法。html
在内部,它的工做方式以下。 Spark Streaming接收实时输入数据流,并将数据分红批次,而后由Spark引擎进行处理,以生成批次的最终结果流。java
Spark Streaming提供了一种高级抽象,称为离散流或DStream,它表示连续的数据流。DStreams能够从Kafka、Flume和Kinesis等源的输入数据流建立,也能够经过在其余DStreams上应用高级操做建立。在内部,DStream表示为RDDs序列。node
1. 了解Spark算法
Apache Spark 是一个用于大规模数据处理的统一分析引擎数据库
特性:apache
快api
将工做负载运行速度提升100倍服务器
Apache Spark使用最新的DAG调度程序,查询优化器和物理执行引擎,为批处理数据和流数据提供了高性能。app
易用机器学习
可使用Java,Scala,Python,R和SQL快速编写应用程序。
通用
结合SQL、流和复杂的分析
Spark为包括SQL和DataFrames,用于机器学习的MLlib,GraphX和Spark Streaming在内的一堆库提供支持。您能够在同一应用程序中无缝组合这些库。
处处运行
Spark可在Hadoop,Apache Mesos,Kubernetes,独立或云中运行。它能够访问各类数据源。
能够在EC2,Hadoop YARN,Mesos或Kubernetes上使用其独立集群模式运行Spark。访问HDFS,Alluxio,Apache Cassandra,Apache HBase,Apache Hive和数百种其余数据源中的数据。
2. 入门案例
统计单词出现的次数,这个例子在Hadoop中用MapReduce也写过。
JavaStreamingContext是java版的StreamingContext。它是Spark Streaming功能的主要入口点。它提供了从输入源建立JavaDStream和JavaPairDStream的方法。可使用context.sparkContext访问内部的org.apache.spark.api.java.JavaSparkContext。在建立和转换DStream以后,能够分别使用context.start()和context.stop()启动和中止流计算。
1 public static void main(String[] args) throws InterruptedException {
2 // Create a local StreamingContext with two working thread and batch interval of 1 second
3 SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
4 JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
5
6 // Create a DStream that will connect to hostname:port, like localhost:9999
7 JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
8
9 // Split each line into words
10 JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
11
12 // Count each word in each batch
13 JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
14 JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
15
16 // Print the first ten elements of each RDD generated in this DStream to the console
17 wordCounts.print();
18
19 // Start the computation
20 jssc.start();
21 // Wait for the computation to terminate
22 jssc.awaitTermination();
23 }
3. 基本概念
3.1. Maven依赖
1 <groupId>org.apache.spark</groupId>
2 <artifactId>spark-streaming_2.12</artifactId>
3 <version>2.4.5</version>
4 <scope>provided</scope>
5 </dependency>
为了从其它数据源获取数据,须要添加相应的依赖项spark-streaming-xyz_2.12。例如:
1 <dependency>
2 <groupId>org.apache.spark</groupId>
3 <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
4 <version>2.4.5</version>
5 </dependency>
3.2. 初始化StreamingContext
为了初始化一个Spark Streaming程序,必须建立一个StreamingContext对象,该对象是全部Spark Streaming功能的主要入口点。
咱们能够从SparkConf对象中建立一个JavaStreamingContext对象
1 import org.apache.spark.SparkConf;
2 import org.apache.spark.streaming.Duration;
3 import org.apache.spark.streaming.api.java.JavaStreamingContext;
4
5 SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
6 JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));
appName 参数是显示在集群UI上的你的应用的名字
master 参数是一个Spark、 Mesos 或 YARN 集群URL,或者也能够是一个特定的字符串“local[*]”表示以本地模式运行。实际上,当在集群上运行时,确定不但愿对在程序中对master进行硬编码,而但愿经过spark-submit启动应用程序并在其中接收它。然而,对于本地测试,你能够传“local[*]”来运行Spark Streaming。
还能够从一个已存在的JavaSparkContext中建立一个JavaStreamingContext对象
1 import org.apache.spark.streaming.api.java.*;
2
3 JavaSparkContext sc = ... //existing JavaSparkContext
4 JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));
在定义完context以后,必须作如下事情:
须要记住的点:
3.3. DStreams(离散流)
Discretized Stream 或 DStream 是Spark Streaming提供的基本抽象。它表示一个连续的数据流,能够是从源接收的输入数据流,也能够是经过转换输入流生成的已处理数据流。在内部,DStream由一系列连续的RDD表示,这是Spark对不变的分布式数据集的抽象。DStream中的每一个RDD都包含来自特定间隔的数据,以下图所示。
在DStream上执行的任何操做都转换为对基础RDD的操做。例如,最简单的将一行句子转换为单词的例子中,flatMap操做应用于行DStream中的每一个RDD,以生成单词DStream的RDD。以下图所示:
3.4. Input DStreams 和 Receivers
Input DStream是表示从源接收的输入数据流。在上图中,lines是输入DStream,由于它表示从netcat服务器接收的数据流。每个输入DStream都关联着一个Receiver对象,该对象从源接收数据并将其存储在Spark的内存中以进行处理。
Spark Streaming提供了两类内置的streaming源:
若是要在流应用程序中并行接收多个数据流,则能够建立多个输入DStream。这将建立多个Receiver(接收器),这些接收器将同时接收多个数据流。重要的是要记住,必须为Spark Streaming应用程序分配足够的内核(或线程,若是在本地运行),以处理接收到的数据以及运行接收器。
须要记住的点:
Basic Sources
为了从文件中读取数据,能够经过StreamingContext.fileStream[KeyClass, ValueClass, InputFormatClass]来建立一个DStream
例如:streamingContext.textFileStream(dataDirectory);
Spark Streaming将监视目录dataDirectory并处理在该目录中建立的全部文件
3.5. Transformations on DStreams
对DStreams作转换,与RDD类似,转换容许修改输入DStream中的数据。DStream支持普通Spark RDD上可用的许多转换。一些常见的方法以下:
map(func) | 经过将源DStream的每一个元素传递给函数func来处理并返回新的DStream | ||
flatMap(func) | 与map相似,可是每一个输入项能够映射到0个或多个输出项 | ||
filter(func) | 过滤 | ||
repartition(numPartitions) | 经过建立更多或更少的分区来更改此DStream中的并行度 | ||
union(otherStream) | 将源DStream和另外一个DStream中的元素合并在一块儿,返回一个新的DStream。至关于SQL中的union | ||
count() | 返回元素的个数 | ||
reduce(func) | 经过使用函数func(接受两个参数并返回一个)来聚合源DStream的每一个RDD中的元素,从而返回一个单元素RDD的新DStream。 | ||
countByValue() |
|
||
reduceByKey(func, [numTasks]) | 在一个(K,V)形式的DStream上调用时,返回一个新的(K,V)DStream,其中使用给定的reduce函数汇总每一个键的值 | ||
join(otherStream, [numTasks]) | 在(K,V)和(K,W)两个DStream上调用时,返回一个新的(K,(V,W))DStream | ||
cogroup(otherStream, [numTasks]) | 在(K,V)和(K,W)DStream上调用时,返回一个新的(K,Seq [V],Seq [W])元组的DStream | ||
transform(func) | 经过对源DStream的每一个RDD应用RDD-to-RDD函数来返回新的DStream。这可用于在DStream上执行任意RDD操做。 |
||
updateStateByKey(func) | 返回一个新的“state” DStream |
其实,此次操做跟Java Stream很像
Window Operations(窗口操做)
Spark Streaming还提供了窗口计算,能够在数据的滑动窗口上应用转换。下图说明了此滑动窗口:
如图所示,每当窗口在源DStream上滑动时,就会对落入窗口内的源RDD进行操做,以生成窗口DStream的RDD。
任何窗口函数所必须的两个参数:
举个例子,咱们来扩展前面的示例,假设咱们想要每10秒在数据的最后30秒生成一次单词次数统计。为此,必须在数据的最后30秒内对(word,1)对的DStream对应用reduceByKey操做。
1 import org.apache.spark.streaming.Durations;
2 import org.apache.spark.streaming.api.java.JavaDStream;
3 import org.apache.spark.streaming.api.java.JavaPairDStream;
4 import scala.Tuple2;
5
6
7 JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());
8 JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));
9
10 // Reduce last 30 seconds of data, every 10 seconds
11 JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10));
一些常见的窗口操做以下。全部这些操做均采用上述两个参数:windowLength和slideInterval
window(windowLength, slideInterval) | 返回基于源DStream的窗口批处理计算的新DStream |
countByWindow(windowLength, slideInterval) | 返回流中元素的滑动窗口数 |
reduceByWindow(func, windowLength, slideInterval) | 对窗口内的数据进行聚合操做 |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | 在(K,V)DStream上调用时,返回新的(K,V)DStream,其中使用给定的reduce函数func在滑动窗口中的批处理上汇总每一个键的值 |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) | |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) |
3.6. Output Operations on DStreams
输出操做容许将DStream的数据输出到外部系统,例如数据库或文件系统。
流式应用程序必须24/7全天候运行,所以必须可以抵抗与应用程序逻辑无关的故障(例如,系统故障,JVM崩溃等)。为此,Spark Streaming须要将足够的信息检查点指向容错存储系统,以即可以从故障中恢复。检查点有两种类型的数据。
完整代码:
1 package com.example.demo;
2
3 import org.apache.spark.SparkConf;
4 import org.apache.spark.streaming.Durations;
5 import org.apache.spark.streaming.api.java.JavaDStream;
6 import org.apache.spark.streaming.api.java.JavaPairDStream;
7 import org.apache.spark.streaming.api.java.JavaStreamingContext;
8 import scala.Tuple2;
9
10 import java.util.Arrays;
11 import java.util.regex.Pattern;
12
13 /**
14 * @author ChengJianSheng
15 */
16 public class JavaWordCount {
17
18 private static final Pattern SPACE = Pattern.compile(" ");
19
20 public static void main(String[] args) {
21 if (args.length < 1) {
22 System.err.println("Usage: JavaWordCount <file>");
23 System.exit(1);
24 }
25
26 SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("JavaWordCount");
27 JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
28
29 JavaDStream<String> lines = jssc.textFileStream(args[0]);
30 JavaDStream<String> words = lines.flatMap(line -> Arrays.asList(SPACE.split(line)).iterator());
31 JavaPairDStream<String, Integer> ones = words.mapToPair(word -> new Tuple2<>(word, 1));
32 JavaPairDStream<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2);
33 counts.print();
34
35 /*
36 JavaStreamingContext jsc = new JavaStreamingContext(conf, Durations.seconds(1));
37 JavaDStream<String> textFileStream = jsc.textFileStream("/data");
38 textFileStream.flatMap(line->Arrays.asList(line.split(" ")).iterator())
39 .mapToPair(word->new Tuple2<>(word, 1))
40 .reduceByKey((a,b)->a+b)
41 .print();
42 jsc.start();
43 */
44 }
45 }
4. Docs
https://spark.apache.org/docs/latest/streaming-programming-guide.html