Spark streaming是Spark核心API的一个扩展,它对实时流式数据的处理具备可扩展性、高吞吐量、可容错性等特色。咱们能够从kafka、flume、Twitter、 ZeroMQ、Kinesis等源获取数据,也能够经过由 高阶函数map、reduce、join、window等组成的复杂算法计算出数据。最后,处理后的数据能够推送到文件系统、数据库、实时仪表盘中。html
一、项目建立java
关于Java:选用1.7或者1.8.为了通用性,本章内容使用1.7进行编写。node
关于Scala:工程不须要增长scala nature,即不需Add Scala Nature。若增长在java代码中调用scala library会有异常。python
关于Spark版本:使用1.6.3进行编写。git
maven 依赖github
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.3</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.3</version> </dependency> </dependencies> |
例子:算法
public class NetworkWordCount { public static void main(String[] args) { networkWC(); } public static void networkWC() { // Create a local StreamingContext with two working thread and batch // interval of 5 second SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); // Create a DStream that will connect to hostname:port, like // localhost:9999 JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999); // Split each line into words JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { return Arrays.asList(x.split(" ")); } }); // Count each word in each batch JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); // Print the first ten elements of each RDD generated in this DStream to // the console wordCounts.print(); jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate } } |
为了初始化Spark Streaming程序,一个JavaStreamingContext对象必需被建立,它是SparkStreaming全部流操做的主要入口。一个JavaStreamingContext 对象能够用SparkConf对象建立。须要注意的是,它在内部建立了一个JavaSparkContext对象,你能够经过 jssc.sparkContext 访问这个SparkContext对象。sql
// Create a local StreamingContext with two working thread and batch // interval of 5 second SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); |
当一个上下文(context)定义以后,你必须按照如下几步进行操做数据库
几点须要注意的地方:apache
DStreams是Spark Streaming提供的基本的抽象,它表明一个连续的数据流。它要么是从源中获取的输入流,要么是输入流经过转换算子生成的处理后的数据流。在内部,DStreams由一系列连续的 RDD组成。DStreams中的每一个RDD都包含肯定时间间隔内的数
据,以下图所示:
任何对DStreams的操做都转换成了对DStreams隐含的RDD的操做。在前面的例子中, flatMap 操做应用于 lines 这个DStreams的每一个RDD,生成 words 这个DStreams的RDD。过程以下图所示:
经过Spark引擎计算这些隐含RDD的转换算子。DStreams操做隐藏了大部分的细节,而且为了更便捷,为开发者提供了更高层的API。下面几节将具体讨论这些操做的细节。
输入DStreams表示从数据源获取输入数据流的DStreams。在快速例子中, lines 表示输入DStream,它表明从netcat服务器获取的数据流。每个输入流DStream 和一个 Receiver 对象相关联,这个 Receiver 从源中获取数据,并将数据存入内存中用于处理。
输入DStreams表示从数据源获取的原始数据流。Spark Streaming拥有两类数据源
须要注意的是,若是你想在一个流应用中并行地建立多个输入DStream来接收多个数据流,你可以建立多个输入流(这将在性能调优那一节介绍) 。它将建立多个Receiver同时接收多个数据流。可是, receiver 做为一个长期运行的任务运行在Spark worker或executor中。所以,它占有一个核,这个核是分配给Spark Streaming应用程序的全部 核中的一个(itoccupies one of the cores allocated to the Spark Streaming application)。因此,为SparkStreaming应用程序分配足够的核(若是是本地运行,那么是线程) 用以处理接收的数据而且运行 receiver 是很是重要的。
几点须要注意的地方:
咱们已经在快速例子中看到, ssc.socketTextStream(...) 方法用来把从TCP套接字获取的文本数据建立成DStream。除了套接字,StreamingContext API也支持把文件 以及Akka actors做为输入源建立DStream。
套接字:从一个ip:port监控套接字。
文件流(File Streams):从任何与HDFS API兼容的文件系统中读取数据,一个DStream能够经过以下方式建立
streamingContext.fileStream<KeyClass, ValueClass, InputFormatClass>(dataDirectory); |
Spark Streaming将会监控 dataDirectory 目录,而且处理目录下生成的任何文件(嵌套目录不被支持)。须要注意一下三点:
对于简单的文本文件,有一个更简单的方法 streamingContext.textFileStream(dataDirectory) 能够被调用。文件流不须要运行一个receiver,因此不须要分配核。
(待补充)
(待补充)
和RDD相似,transformation容许从输入DStream来的数据被修改。DStreams支持不少在RDD中可用的transformation算子。一些经常使用的算子以下所示:
Transformation | Meaning |
---|---|
map(func) | Return a new DStream by passing each element of the source DStream through a function func. |
flatMap(func) | Similar to map, but each input item can be mapped to 0 or more output items. |
filter(func) | Return a new DStream by selecting only the records of the source DStream on which func returns true. |
repartition(numPartitions) | Changes the level of parallelism in this DStream by creating more or fewer partitions. |
union(otherStream) | Return a new DStream that contains the union of the elements in the source DStream and otherDStream. |
count() | Return a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. |
reduce(func) | Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel. |
countByValue() | When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream. |
reduceByKey(func, [numTasks]) | When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config propertyspark.default.parallelism ) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. |
join(otherStream, [numTasks]) | When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) pairs with all pairs of elements for each key. |
cogroup(otherStream, [numTasks]) | When called on a DStream of (K, V) and (K, W) pairs, return a new DStream of (K, Seq[V], Seq[W]) tuples. |
transform(func) | Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. This can be used to do arbitrary RDD operations on the DStream. |
updateStateByKey(func) | Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key. |
updateStateByKey操做容许不断用新信息更新它的同时保持任意状态。你须要经过两步来使用它
让咱们举个例子说明。在例子中,你想保持一个文本数据流中每一个单词的运行次数,运行次数用一个state表示,它的类型是整数
import com.google.common.base.Optional; Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() { @Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { Integer newSum = ... // add the new values with the previous running count to get the new count return Optional.of(newSum); } }; |
这被应用在一个包含word( pairs DStream 含有(word,1)对)的DStream上。
JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction); |
这个update函数被每一个单词word调用,newValues 拥有一系列的1(从 (词, 1)对而来),runningCount拥有以前的次数。要看完整的代码,见例子JavaStatefulNetworkWordCount.java. 备注使用updateStateByKey时须要定义checkpoint目录,更多细节在checkpointing部分。
(略)
Spark Streaming也支持窗口计算,它容许你在一个滑动窗口数据上应用transformation算子。下图阐明了这个滑动窗口。
如上图显示,窗口在源DStream上滑动,合并和操做落入窗内的源RDDs,产生窗口化的DStream的RDDs。在这个具体的例子中,程序在三个时间单元的数据上进行窗口操做,而且每两个时间单元滑动一次。 这说明,任何一个窗口操做都须要指定两个参数:
这两个参数必须是源DStream的批时间间隔的倍数。
下面举例说明窗口操做。例如,你想扩展前面的例子用来计算过去30秒的词频,间隔时间是10秒。为了达到这个目的,咱们必须在过去30秒的 pairs DStream上应用 reduceByKey 操做。用方法 reduceByKeyAndWindow 实现。
// Reduce function adding two integers, defined separately for clarity Function2<Integer, Integer, Integer> reduceFunc = new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }; // Reduce last 30 seconds of data, every 10 seconds JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow(reduceFunc, Durations.seconds(30), Durations.seconds(10)); |
一些经常使用的窗口操做以下所示,这些操做都须要用到上文提到的两个参数:窗口长度和滑动的时间间隔
Transformation | Meaning |
---|---|
window(windowLength, slideInterval) | Return a new DStream which is computed based on windowed batches of the source DStream. |
countByWindow(windowLength, slideInterval) | Return a sliding window count of elements in the stream. |
reduceByWindow(func, windowLength, slideInterval) | Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative so that it can be computed correctly in parallel. |
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) | When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function func over batches in a sliding window. Note: By default, this uses Spark's default number of parallel tasks (2 for local mode, and in cluster mode the number is determined by the config property spark.default.parallelism ) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. |
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) | A more efficient version of the above |
countByValueAndWindow(windowLength, slideInterval, [numTasks]) | When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in reduceByKeyAndWindow , the number of reduce tasks is configurable through an optional argument. |
最后,值得强调的是,您能够轻松地在Spark Streaming中执行不一样类型的join。
Stream-stream joins
Streams 可以方便地和其余streams进行join操做.
JavaPairDStream<String, String> stream1 = ... JavaPairDStream<String, String> stream2 = ... JavaPairDStream<String, Tuple2<String, String>> joinedStream = stream1.join(stream2); |
在这里,每批间隔,stream1产生的 RDD将与stream2产生的 RDD进行join操做。你也能够作leftouterjoin,rightouterjoin,fullouterjoin。此外,在stream的窗口上进行join一般是很是有用的。这也很容易。
JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20)); JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1)); JavaPairDStream<String, Tuple2<String, String>> joinedStream = windowedStream1.join(windowedStream2); |
Stream-dataset joins
下面是另外一个将窗口流与数据集join的例子。
JavaPairRDD<String, String> dataset = ... JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20)); JavaPairDStream<String, String> joinedStream = windowedStream.transform( new Function<JavaRDD<Tuple2<String, String>>, JavaRDD<Tuple2<String, String>>>() { @Override public JavaRDD<Tuple2<String, String>> call(JavaRDD<Tuple2<String, String>> rdd) { return rdd.join(dataset); } } ); |
事实上,您还能够动态地更改你想要join的数据集。提供转换的函数在每一个批处理间隔中进行计算求值,所以将使用数据集引用指向的当前数据集。
The complete list of DStream transformations is available in the API documentation. For the Scala API, see DStream and PairDStreamFunctions. For the Java API, see JavaDStream and JavaPairDStream. For the Python API, see DStream.
输出操做容许DStream的操做推到如数据库、文件系统等外部系统中。由于输出操做其实是容许外部系统消费转换后的数据,它们触发的实际操做是DStream转换。目前,定义了下面几种输出操做:
Output Operation | Meaning |
---|---|
print() | Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging. Python API This is called pprint() in the Python API. |
saveAsTextFiles(prefix, [suffix]) | Save this DStream's contents as text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". |
saveAsObjectFiles(prefix, [suffix]) | Save this DStream's contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".Python API This is not available in the Python API. |
saveAsHadoopFiles(prefix, [suffix]) | Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". Python API This is not available in the Python API. |
foreachRDD(func) | The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs. |
利用foreachRDD的设计模式
dstream.foreachRDD是一个强大的原语,发送数据到外部系统中。然而,明白怎样正确地、有效地用这个原语是很是重要的。下面几点介绍了如何避免通常错误。常常写数据到外部系统须要建一个链接对象(例如到远程服务器的TCP链接),用它发送数据到远程系统。为了达到这个目的,开发人员可能不经意的在Spark驱动中建立一个链接对象,可是在Spark worker中 尝试调用这个链接对象保存记录到RDD中,以下:
dstream.foreachRDD(rdd => { val connection = createNewConnection() // executed at the driver rdd.foreach(record => { connection.send(record) // executed at the worker }) }) |
这是不正确的,由于这须要先序列化链接对象,而后将它从driver发送到worker中。这样的链接对象在机器之间不能传送。它可能表现为序列化错误(链接对象不可序列化)或者初始化错误(链接对象应该 在worker中初始化)等等。正确的解决办法是在worker中建立链接对象。
然而,这会形成另一个常见的错误-为每个记录建立了一个链接对象。例如:
dstream.foreachRDD(rdd => { rdd.foreach(record => { val connection = createNewConnection() connection.send(record) connection.close() }) }) |
一般,建立一个链接对象有资源和时间的开支。所以,为每一个记录建立和销毁链接对象会致使很是高的开支,明显的减小系统的总体吞吐量。一个更好的解决办法是利用 rdd.foreachPartition 方法。 为RDD的partition建立一个链接对象,用这个两件对象发送partition中的全部记录。
dstream.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { val connection = createNewConnection() partitionOfRecords.foreach(record => connection.send(record)) connection.close() }) }) |
这就将链接对象的建立开销分摊到了partition的全部记录上了。
最后,能够经过在多个RDD或者批数据间重用链接对象作更进一步的优化。开发者能够保有一个静态的链接对象池,重复使用池中的对象将多批次的RDD推送到外部系统,以进一步节省开支。
dstream.foreachRDD(rdd => { rdd.foreachPartition(partitionOfRecords => { // ConnectionPool is a static, lazily initialized pool of connections val connection = ConnectionPool.getConnection() partitionOfRecords.foreach(record => connection.send(record)) ConnectionPool.returnConnection(connection) // return to the pool for future reuse }) }) |
须要注意的是,池中的链接对象应该根据须要延迟建立,而且在空闲一段时间后自动超时。这样就获取了最有效的方式发生数据到外部系统。
其它须要注意的地方:
你能够很容易地使用DataFrames and SQL操做数据流。你必须使用StreamingContext已用的SparkContext,以建立一个SQLContext。此外,这必须这样作,它能够在驱动程序故障的时候从新启动。这是经过sqlcontext惰性实例化一个单例实例来完成的。下面的例子说明了这一点。它改变了早期的word count example 。每一个RDD转换为一个DataFrame,做为一个临时表注册并经过SQL进行查询。
/** Java Bean class for converting RDD to DataFrame */ public class JavaRow implements java.io.Serializable { private String word; public String getWord() { return word; } public void setWord(String word) { this.word = word; } } ... /** DataFrame operations inside your streaming program */ JavaDStream<String> words = ... words.foreachRDD( new Function2<JavaRDD<String>, Time, Void>() { @Override public Void call(JavaRDD<String> rdd, Time time) { // Get the singleton instance of SQLContext SQLContext sqlContext = SQLContext.getOrCreate(rdd.context()); // Convert RDD[String] to RDD[case class] to DataFrame JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() { public JavaRow call(String word) { JavaRow record = new JavaRow(); record.setWord(word); return record; } }); DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRow.class); // Register as table wordsDataFrame.registerTempTable("words"); // Do word count on table using SQL and print it DataFrame wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word"); wordCountsDataFrame.show(); return null; } } ); |
查看完整代码 source code.
You can also run SQL queries on tables defined on streaming data from a different thread (that is, asynchronous to the running StreamingContext). Just make sure that you set the StreamingContext to remember a sufficient amount of streaming data such that the query can run. Otherwise the StreamingContext, which is unaware of the any asynchronous SQL queries, will delete off old streaming data before the query can complete. For example, if you want to query the last batch, but your query can take 5 minutes to run, then call streamingContext.remember(Minutes(5))
(in Scala, or equivalent in other languages).
和RDD类似,DStreams也容许开发者持久化流数据到内存中。在DStream上使用 persist() 方法能够自动地持久化DStream中的RDD到内存中。若是DStream中的数据须要计算屡次,这是很是有用的。像 reduceByWindow 和 reduceByKeyAndWindow 这种窗口操做、 updateStateByKey 这种基于状态的操做,持久化是默认的,不须要开发者调用 persist() 方法。例如经过网络(如kafka,flume等)获取的输入数据流,默认的持久化策略是复制数据到两个不一样的节点以容错。
注意,与RDD不一样的是,DStreams默认持久化级别是存储序列化数据到内存中,这将在Performance Tuning 章节介绍。更多的信息请看rdd持久化 Spark Programming Guide.
一个流应用程序必须全天候运行,全部必须可以解决应用程序逻辑无关的故障(如系统错误,JVM崩溃等)。为了使这成为可能,Spark Streaming须要checkpoint足够的信息到容错存储系统中, 以使系统从故障中恢复。
Metadata checkpointing:保存流计算的定义信息到容错存储系统如HDFS中。这用来恢复应用程序中运行worker的节点的故障。元数据包括
Data checkpointing :保存生成的RDD到可靠的存储系统中,这在有状态transformation(如结合跨多个批次的数据)中是必须的。在这样一个transformation中,生成的RDD依赖于以前 批的RDD,随着时间的推移,这个依赖链的长度会持续增加。在恢复的过程当中,为了不这种无限增加。有状态的transformation的中间RDD将会定时地存储到可靠存储系统中,以截断这个依赖链。元数据checkpoint主要是为了从driver故障中恢复数据。若是transformation操做被用到了,数据checkpoint即便在简单的操做中都是必须的。
应用程序在下面两种状况下必须开启checkpoint
注意,没有前述的有状态的transformation的简单流应用程序在运行时能够不开启checkpoint。在这种状况下,从driver故障的恢复将是部分恢复(接收到了可是尚未处理的数据将会丢失)。 这一般是能够接受的,许多运行的Spark Streaming应用程序都是这种方式。
在容错、可靠的文件系统(HDFS、s3等)中设置一个目录用于保存checkpoint信息。着能够经过 streamingContext.checkpoint(checkpointDirectory) 方法来作。这运行你用以前介绍的 有状态transformation。另外,若是你想从driver故障中恢复,你应该如下面的方式重写你的Streaming应用程序。
// Create a factory object that can create a and setup a new JavaStreamingContext JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() { @Override public JavaStreamingContext create() { JavaStreamingContext jssc = new JavaStreamingContext(...); // new context JavaDStream<String> lines = jssc.socketTextStream(...); // create DStreams ... jssc.checkpoint(checkpointDirectory); // set checkpoint directory return jssc; } }; // Get JavaStreamingContext from checkpoint data or create a new one JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory); // Do additional setup on context that needs to be done, // irrespective of whether it is being started or restarted context. ... // Start the context context.start(); context.awaitTermination(); |
若是 checkpointDirectory 存在,上下文将会利用checkpoint数据从新建立。若是这个目录不存在,将会调用 functionToCreateContext 函数建立一个新的上下文,创建DStreams。 请看RecoverableNetworkWordCount例子。
除了使用 getOrCreate ,开发者必须保证在故障发生时,driver处理自动重启。只能经过部署运行应用程序的基础设施来达到该目的。在部署章节将有更进一步的讨论。
注意,RDD的checkpointing有存储成本。这会致使批数据(包含的RDD被checkpoint)的处理时间增长。所以,须要当心的设置批处理的时间间隔。在最小的批容量(包含1秒的数据)状况下,checkpoint每批数据会显著的减小 操做的吞吐量。相反,checkpointing太少会致使谱系以及任务大小增大,这会产生有害的影响。由于有状态的transformation须要RDD checkpoint。默认的间隔时间是批间隔时间的倍数,最少10秒。它能够经过 dstream.checkpoint 来设置。典型的状况下,设置checkpoint间隔是DStream的滑动间隔的5-10大小是一个好的尝试。