Spark Struntured Streaming是Spark 2.1.0版本后新增长的流计算引擎,本博将经过几篇博文详细介绍这个框架。这篇是介绍Spark Structured Streaming的基本开发方法。以Spark 自带的example进行测试和介绍,其为"StructuredNetworkWordcount.scala"文件。html
因为咱们是在单机上进行测试,因此须要修单机运行模型,修改后的程序以下:sql
package org.apache.spark.examples.sql.streaming apache
import org.apache.spark.sql.SparkSession 数组
/** app * Counts words in UTF8 encoded, '\n' delimited text received from the network. 框架 * socket * Usage: StructuredNetworkWordCount <hostname> <port> ide * <hostname> and <port> describe the TCP server that Structured Streaming 测试 * would connect to receive data. ui * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example * `$ bin/run-example sql.streaming.StructuredNetworkWordCount * localhost 9999` */ object StructuredNetworkWordCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: StructuredNetworkWordCount <hostname> <port>") System.exit(1) }
val host = args(0) val port = args(1).toInt
val spark = SparkSession .builder .appName("StructuredNetworkWordCount") .master("local[*]") .getOrCreate()
import spark.implicits._
// Create DataFrame representing the stream of input lines from connection to host:port val lines = spark.readStream .format("socket") .option("host", host) .option("port", port) .load()
// Split the lines into words val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count val wordCounts = words.groupBy("value").count()
// Start running the query that prints the running counts to the console val query = wordCounts.writeStream .outputMode("complete") .format("console") .start()
query.awaitTermination() } } |
对于上述所示的程序,进行以下的解读和分析:
在建立SparkSessiion对象以后,须要设置数据源的类型,及数据源的配置。而后就会数据源中源源不断的接收数据,接收到的数据以DataFrame对象存在,该类型与Spark SQL中定义类型同样,内部由Dataset数组组成。
以下程序所示,设置输入源的类型为socket,并配置socket源的IP地址和端口号。接收到的数据流存储到lines对象中,其类型为DataFarme。
// Create DataFrame representing the stream of input lines from connection to host:port val lines = spark.readStream .format("socket") .option("host", host) .option("port", port) .load() |
以下程序所示,首先将接受到的数据流lines转换为String类型的序列;接着每一批数据都以空格分隔为独立的单词;最后再对每一个单词进行分组并统计次数。
// Split the lines into words val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count val wordCounts = words.groupBy("value").count() |
经过DataFrame对象的writeStream方法获取DataStreamWrite对象,DataStreamWrite类定义了一些数据输出的方式。Quick example程序将数据输出到控制终端。注意只有在调用start()方法后,才开始执行Streaming进程,start()方法会返回一个StreamingQuery对象,用户可使用该对象来管理Streaming进程。如上述程序调用awaitTermination()方法阻塞接收全部数据。
当直接提交编译后的架包时,即没有启动"nc –lk 9999"时,会出现图 11所示的错误。解决该异常,只需在提交(spark-submit)程序以前,先启动"nc"命令便可解决,且不能使用"nc –lk localhost 9999".
图 11
当经过mvn打包程序后,在命令行经过spark-submit提交架包,可以正常执行,而经过IDEA执行时会出现图 12所示的错误。
图 12
出现这个异常,是因为项目中依赖的Scala版本与Spark编译的版本不一致,从而致使出现这种错误。图 13和图 14所示,Spark 2.10是由Scala 2.10版本编译而成的,而项目依赖的Scala版本是2.11.8,从而致使出现了错误。
图 13
图 14
解决该问题,只需在项目的pom.xml文件中指定与spark编译的版本一致,便可解决该问题。如图 15所示的执行结果。
图 15