响应更快,对过去的架构进行了全新的设计和处理。sql
核心思想:将实时数据流视为一张正在不断添加数据的表,参见Spark SQL's DataFrame。编程
写日志操做 保证一致性。json
由于要写入日志操做,每次进行微批处理以前,都要先把当前批处理的数据的偏移量要先写到日志里面去。架构
如此,就带来了微小的延迟。app
数据到达 和 获得处理 并输出结果 之间的延时超过100毫秒。 异步
例如:"欺诈检测",在100ms以内判断盗刷行为,并给予制止。socket
由于 “异步” 写入日志,因此致使:至少处理一次,不能保证“仅被处理一次”。oop
Spark SQL 只能处理静态处理。测试
Structured Streaming 能够处理数据流。ui
统计每一个单词出现的频率。
from pyspark.sql import SparkSession from pyspark.sql.functions import split from pyspark.sql.functions import explode if __name__ == "__main__":
spark = SparkSession.builder.appName("StructuredNetworkWordCount").getOrCreate() spark.sparkContext.setLogLevel('WARN')
# 建立一个输入数据源,相似"套接子流",只是“相似”。 lines = spark.readStream.format("socket").option("host”, “localhost").option("port", 9999).load()
# Explode获得一个DataFrame,一个单词变为一行; # 再给DataFrame这列的title设置为 "word"; # 根据word这一列进行分组词频统计,获得“每一个单词到底出现了几回。
words = lines.select( explode( split( lines.value, " " ) ).alias("word") )
wordCounts = words.groupBy("word").count() # <--- 获得结果
# 启动流计算并输出结果 query = wordCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="8 seconds").start() query.awaitTermination()
程序要依赖于Hadoop HDFS。
$ cd /usr/local/hadoop
$ sbin/start-dfs.sh
$ nc -lk 9999
$ /usr/local/spark/bin/spark-submit StructuredNetworkWordCount.py
(1) 建立程序生成JSON格式的File源测试数据
例如,对Json格式文件进行内容统计。目录下面有1000 json files,格式以下:
(2) 建立程序对数据进行统计
import os import shutil from pprint import pprint from pyspark.sql import SparkSession from pyspark.sql.functions import window, asc from pyspark.sql.types import StructType, StructField from pyspark.sql.types import TimestampType, StringType TEST_DATA_DIR_SPARK = 'file:///tmp/testdata/' if __name__ == "__main__": # 定义模式 schema = StructType([ StructField("eventTime" TimestampType(), True), StructField("action", StringType(), True), StructField("district", StringType(), True) ]) spark = SparkSession.builder.appName("StructuredEMallPurchaseCount").getOrCreate() spark.sparkContext.setLogLevel("WARN") lines = spark.readStream.format("json").schema(schema).option("maxFilesPerTrigger", 100).load(TEST_DATA_DIR_SPARK) # 定义窗口 windowDuration = '1 minutes' windowedCounts = lines.filter("action = 'purchase'") \ .groupBy('district', window('eventTime', windowDuration)) \ .count() \ .sort(asc('window''))
# 启动流计算 query = windowedCounts \ .writeStream \ .outputMode("complete") \ .format("console") \ .option('truncate', 'false') \ .trigger(processingTime = "10 seconds") \ # 每隔10秒,执行一次流计算 .start() query.awaitTermination()
(3) 测试运行程序
a. 启动 HDFS
$ cd /usr/local/hadoop
$ sbin/start-dfs.sh
b. 运行数据统计程序
/usr/local/spark/bin/spark-submit spark_ss_filesource.py
c. 运行结果
(由于只能r&d,不能生产时间,故,这里暂时略)
通常不用于生产模式,实验测试模式却是能够。
from pyspark.sql import SparkSession if __name__ == "__main__": spark = SparkSession.builder.appName("TestRateStreamSource").getOrCreate() spark.sparkContext.setLogLevel('WARN')
紧接着是下面的程序:
# 每秒钟发送五行,属于rate源;
# query 表明了流计算启动模式;
运行程序
$ /usr/local/spark/bin/spark-submit spark_ss_rate.py
writeStream()方法将会返回DataStreamWrite接口。
query = wordCounts.writeStream.outputMode("complete").format("console").trigger(processingTime="8 seconds").start()
输出 outputMode 模式
接收器 format 类型
系统内置的输出接收器包括:File, Kafka, Foreach, Console (debug), Memory (debug), etc。
生成parquet文件
能够考虑读取后转化为DataFrame;或者使用strings查看文件内容。
代码展现:StructuredNetworkWordCountFileSink.py
from pyspark.sql import SparkSession from pyspark.sql.functions import split from pyspark.sql.functions import explode from pyspark.sql.functions import length
只要长度为5的dataframe,也就是单词长度都是5。
"数据源" 终端
# input string to simulate stream.
nc -lk 9999
"流计算" 终端
/usr/local/spark/bin/spark-submit StructuredNetworkWordCountFileSink.py
End.