[Spark] 06 - Structured Streaming

基本了解

响应更快,对过去的架构进行了全新的设计和处理。sql

核心思想:将实时数据流视为一张正在不断添加数据的表,参见Spark SQL's DataFrame。编程

 

1、微批处理(默认)

写日志操做 保证一致性。json

由于要写入日志操做,每次进行微批处理以前,都要先把当前批处理的数据的偏移量要先写到日志里面去。架构

如此,就带来了微小的延迟。app

数据到达 和 获得处理 并输出结果 之间的延时超过100毫秒。 异步

 

2、持续批处理

例如:"欺诈检测",在100ms以内判断盗刷行为,并给予制止。socket

由于 “异步” 写入日志,因此致使:至少处理一次,不能保证“仅被处理一次”。oop

Spark SQL 只能处理静态处理。测试

Structured Streaming 能够处理数据流。ui

 

 

 

Structured Streaming 编程

1、基本步骤

 

2、demo 示范

代码展现

统计每一个单词出现的频率。

from pyspark.sql import SparkSession
from pyspark.sql.functions import split from pyspark.sql.functions import explode if __name__ == "__main__":
  spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate()   spark.sparkContext.setLogLevel('WARN')
  
# 建立一个输入数据源,相似"套接子流",只是“相似”。   lines = spark.readStream.format("socket").option("host”, “localhost").option("port", 9999).load()
  # Explode获得一个DataFrame,一个单词变为一行;   # 再给DataFrame这列的title设置为 "word";   # 根据word这一列进行分组词频统计,获得“每一个单词到底出现了几回。
  words = lines.select( explode( split( lines.value, " " ) ).alias("word") )
  wordCounts
= words.groupBy("word").count()   # <--- 获得结果
  # 启动流计算并输出结果
  query = wordCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="8 seconds").start()
  query.awaitTermination()

程序要依赖于Hadoop HDFS。

$ cd /usr/local/hadoop
$ sbin/start-dfs.sh

 

新建”数据源“终端

$ nc -lk 9999

 

新建”流计算“终端

$ /usr/local/spark/bin/spark-submit StructuredNetworkWordCount.py

 

 

 

输入源

1、File 输入源

(1) 建立程序生成JSON格式的File源测试数据

例如,对Json格式文件进行内容统计。目录下面有1000 json files,格式以下:

 

(2) 建立程序对数据进行统计

import os
import shutil
from pprint import pprint

from pyspark.sql import SparkSession
from pyspark.sql.functions import window, asc
from pyspark.sql.types import StructType, StructField from pyspark.sql.types import TimestampType, StringType

TEST_DATA_DIR_SPARK = 'file:///tmp/testdata/'

if __name__ == "__main__":

  # 定义模式
  schema = StructType([
    StructField("eventTime" TimestampType(), True), 
    StructField("action",   StringType(),    True),
    StructField("district", StringType(),    True) ])

  spark = SparkSession.builder.appName("StructuredEMallPurchaseCount").getOrCreate()
  spark.sparkContext.setLogLevel("WARN")

  lines = spark.readStream.format("json").schema(schema).option("maxFilesPerTrigger", 100).load(TEST_DATA_DIR_SPARK)

  # 定义窗口
  windowDuration = '1 minutes'
  windowedCounts = lines.filter("action = 'purchase'")  \
            .groupBy('district', window('eventTime', windowDuration))  \
            .count()  \
            .sort(asc('window''))
   
# 启动流计算   query = windowedCounts  \     .writeStream  \     .outputMode("complete")  \     .format("console")  \     .option('truncate', 'false')  \     .trigger(processingTime = "10 seconds")  \  # 每隔10秒,执行一次流计算     .start()   query.awaitTermination()

 

(3) 测试运行程序

a. 启动 HDFS

$ cd /usr/local/hadoop
$ sbin/start-dfs.sh

b. 运行数据统计程序

/usr/local/spark/bin/spark-submit spark_ss_filesource.py

c. 运行结果

 

 

2、Socket源和 Rate源

(由于只能r&d,不能生产时间,故,这里暂时略)

通常不用于生产模式,实验测试模式却是能够。

from pyspark.sql import SparkSession

if __name__ == "__main__":

  spark = SparkSession.builder.appName("TestRateStreamSource").getOrCreate()
  spark.sparkContext.setLogLevel('WARN')

紧接着是下面的程序:

# 每秒钟发送五行,属于rate源;

# query 表明了流计算启动模式;

 

运行程序

$ /usr/local/spark/bin/spark-submit spark_ss_rate.py

 

 

  

输出操做

1、启动流计算

writeStream()方法将会返回DataStreamWrite接口。

query = wordCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="8 seconds").start() 

 

输出 outputMode 模式

 

接收器 format 类型

系统内置的输出接收器包括:File, Kafka, Foreach, Console (debug), Memory (debug), etc。

 

生成parquet文件

能够考虑读取后转化为DataFrame;或者使用strings查看文件内容。

代码展现:StructuredNetworkWordCountFileSink.py

from pyspark.sql import SparkSession
from pyspark.sql.functions import split from pyspark.sql.functions import explode from pyspark.sql.functions import length

只要长度为5的dataframe,也就是单词长度都是5。

 

"数据源" 终端

# input string to simulate stream.
nc -lk 9999

  

"流计算" 终端

/usr/local/spark/bin/spark-submit StructuredNetworkWordCountFileSink.py

 

End.

相关文章
相关标签/搜索