一:Spark Streaming 概览。html
1.1 简单了解 Spark Streaming。java
Spark Streaming 是核心 Spark API的一个扩展。具备可扩展性,高吞吐量,容错性,实时性等特征。算法
数据从许多来如中摄入数据,如 Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets。数据库
也能够使用复杂的算法与高级别的功能像map,reduce,join和window处理。apache
最后,也能够将处理过的数据推送到文件系统、数据库。事实上,咱们也能够用Spark 的机器学习和图形处理数据流上的算法。用图形表示以下:
在内部,其工做原理以下。Spark Streaming接收实时输入的数据流和数据划分批次,而后由Spark引擎批处理生成的最终结果流。如图示: api
另外,Spark Streaming提供一个高级抽象,称为离散的流或 DStream,表示连续的流的数据。DStreams 能够被建立从输入的数据流,如Kafka, Flume, and Kinesis,服务器
或采用其余的DStreams高级别操做的输入的数据流。机器学习
在内部,DStream 是以 RDDs 的序列来表示。socket
首先,看看Maven的依赖包(spark-streaming_2.10)管理:oop
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.6.1</version> </dependency>
1.2 eg:从一个数据服务器监听 TCP 套接字接收的文本数据中的单词进行计数
package com.berg.spark.test5.streaming; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; public class SparkStreamingDemo1 { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); conf.set("spark.testing.memory", "269522560000"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10)); System.out.println("jssc: " + jssc); // 建立一个DStream, 将链接 hostname:port, 好比 master:9999 JavaReceiverInputDStream<String> lines = jssc.socketTextStream("master", 9999); System.out.println("lines : " + lines); JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; public Iterable<String> call(String x) { return Arrays.asList(x.split(" ")); } }); // Count each word in each batch JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); // Print the first ten elements of each RDD generated in this DStream to // the console wordCounts.print(); jssc.start(); // Start the computation jssc.awaitTermination(); // Wait for the computation to terminate } }
至于代码如何运行了,首先在Linux下终端输入:$ nc -lk 9999
而后在Eclipse中运行代码 。
随意输入一行文本单词,单词之间用空格隔开,以下:
hadoop@master:~$ nc -lk 9999 berg hello world berg hello
能够在Eclipse控制台看到以下结果:
Time: 1465386060000 ms ------------------------------------------- (hello,2) (berg,2) (world,1)
1.3 将HDFS目录下的某些文件内容当作 输入的数据流。
public class SparkStreamingDemo2 { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount"); conf.set("spark.testing.memory", "269522560000"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10)); System.out.println("jssc: " + jssc); // 建立一个DStream, 读取HDFS上的文件,做为数据源。 JavaDStream<String> lines = jssc.textFileStream("hdfs://master:9000/txt/sparkstreaming/"); System.out.println("lines : " + lines); // Split each line into words JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; public Iterable<String> call(String x) { System.out.println(Arrays.asList(x.split(" ")).get(0)); return Arrays.asList(x.split(" ")); } }); // Count each word in each batch JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); System.out.println(pairs); JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); // Print the first ten elements of each RDD generated in this DStream to the console wordCounts.print(); JavaDStream<Long> count = wordCounts.count(); count.print(); // 统计 DStream<Tuple2<String, Integer>> dstream = wordCounts.dstream(); dstream.saveAsTextFiles("hdfs://master:9000/bigdata/spark/xxxx", "sparkstreaming"); //wordCounts.dstream().saveAsTextFiles("hdfs://master:9000/bigdata/spark/xxxx", "sparkstreaming"); jssc.start(); jssc.awaitTermination(); // Wait for the computation to terminate } }
上述代码完成的操做是,一直监听HDFS即hdfs://master:9000/txt/sparkstreaming/目录下是否有文件存入,若是有,则统计文件中的单词。。。。
尝试运行程序后,而后往该目录中手动添加一个文件,会在控制台看到对该文件内容中的单词统计后的数据。
注意参数的意义:
public JavaDStream<java.lang.String> textFileStream(java.lang.String directory)
Create an input stream that monitors a Hadoop-compatible filesystem for
new files and reads them as text
files (using key as LongWritable, value as Text and input format as TextInputFormat).
Files must be written to the monitored directory
by "moving" them from another location within the same file system.
File names starting with . are ignored.
Parameters:
directory - HDFS directory to monitor for new file
Returns:
(undocumented)
参照官网 和 API学习。