近年来,大数据的计算引擎愈来愈受到关注,spark做为最受欢迎的大数据计算框架,也在不断的学习和完善中。在Spark2.x中,新开放了一个基于DataFrame的无下限的流式处理组件——Structured Streaming,它也是本系列的主角,废话很少说,进入正题吧!sql
在有过1.6的streaming和2.x的streaming开发体验以后,再来使用Structured Streaming会有一种彻底不一样的体验,尤为是在代码设计上。apache
在过去使用streaming时,咱们很容易的理解为一次处理是当前batch的全部数据,只要针对这波数据进行各类处理便可。若是要作一些相似pv uv的统计,那就得借助有状态的state的DStream,或者借助一些分布式缓存系统,如Redis、Alluxio都能实现。须要关注的就是尽可能快速的处理完当前的batch数据,以及7*24小时的运行便可。缓存
能够看到想要去作一些相似Group by的操做,Streaming是很是不便的。Structured Streaming则完美的解决了这个问题。app
在Structured Streaming中,把源源不断到来的数据经过固定的模式“追加”或者“更新”到了上面无下限的DataFrame中。剩余的工做则跟普通的DataFrame同样,能够去map、filter,也能够去groupby().count()。甚至还能够把流处理的dataframe跟其余的“静态”DataFrame进行join。另外,还提供了基于window时间的流式处理。总之,Structured Streaming提供了快速、可扩展、高可用、高可靠的流式处理。框架
在大数据开发中,Word Count就是基本的演示示例,因此这里也模仿官网的例子,作一下演示。socket
直接看一下完整的例子:分布式
package xingoo.sstreaming import org.apache.spark.sql.SparkSession object WordCount { def main(args: Array[String]): Unit = { val spark = SparkSession .builder .master("local") .appName("StructuredNetworkWordCount") .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._ // 建立DataFrame // Create DataFrame representing the stream of input lines from connection to localhost:9999 val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // Split the lines into words val words = lines.as[String].flatMap(_.split(" ")) // Generate running word count val wordCounts = words.groupBy("value").count() // Start running the query that prints the running counts to the console // 三种模式: // 1 complete 全部内容都输出 // 2 append 新增的行才输出 // 3 update 更新的行才输出 val query = wordCounts.writeStream .outputMode("complete") .format("console") .start() query.awaitTermination() } }
效果就是在控制台输入nc -lk 9999
,而后输入一大堆的字符,控制台就输出了对应的结果:
而后来详细看一下代码:学习
val spark = SparkSession .builder .master("local") .appName("StructuredNetworkWordCount") .getOrCreate() spark.sparkContext.setLogLevel("WARN") import spark.implicits._
上面就不用太多解释了吧,建立一个本地的sparkSession,设置日志的级别为WARN,要不控制台太乱。而后引入spark sql必要的方法(若是没有import spark.implicits._,基本类型是没法直接转化成DataFrame的)。大数据
val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load()
建立了一个Socket链接的DataStream,并经过load()方法获取当前批次的DataFrame。ui
val words = lines.as[String].flatMap(_.split(" ")) val wordCounts = words.groupBy("value").count()
先把DataFrame转成单列的DataSet,而后经过空格切分每一行,再根据value作groupby,并统计个数。
val query = wordCounts.writeStream .outputMode("complete") .format("console") .start()
调用DataFrame的writeStream方法,转换成输出流,设置模式为"complete",指定输出对象为控制台"console",而后调用start()方法启动计算。并返回queryStreaming,进行控制。
这里的outputmode和format都会后续详细介绍。
query.awaitTermination()
经过QueryStreaming的对象,调用awaitTermination阻塞主线程。程序就能够不断循环调用了。
观察一下Spark UI,能够发现程序稳定的在运行~
这就是一个最基本的wordcount的例子,想象一下,若是没有Structured Streaming,想要统计全局的wordcount,仍是很费劲的(即使使用streaming的state,其实也不是那么好用的)。