Structured Streaming入门

基于spark 2.2.0html

Structured Streaming 编程指南

容错语义

提供 end-to-end exactly-once semantics (端到端的彻底一次性语义)是 Structured Streaming 设计背后的关键目标之一java

Datasets 和 DataFrames

自从 Spark 2.0 , DataFrame 和 Datasets 能够表示 static (静态), bounded data(有界数据),以及 streaming , unbounded data (无界数据)sql

Input Sources

在 Spark 2.0 中,有一些内置的 sources 。apache

  1. File source
    以文件流的形式读取目录中写入的文件。支持的文件格式为 text , csv , json , parquet 。文件必须以 atomically (原子方式)放置在给定的目录中
  2. Kafka source
    来自 Kafka 的 Poll 数据。它与 Kafka broker 的 0.10.0 或者更高的版本兼容。
  3. Socket source (for testing)
    从一个 socket 链接中读取 UTF8 文本数据。 监听服务器 socket位于 driver 。只能用于测试,由于它不提供 端到端的容错保证。

Window Operations on Event Time

每一个事件有一个event-time,假设运行Word count,计算10分钟内的window Word count,每5分钟更新一次,延迟超过10分钟才到达的数据再也不从新接收编程

Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words
//watermarking:超过该值到达的数据再也不更新
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
        words.col("word"))
    .count();
复制代码

Window Operations on Event Time

在 non-streaming Dataset (非流数据集)上使用 withWatermark 是不可行的json

Streaming 去重

event中有unique identifier (惟一标识符),可用于删除重复记录服务器

Dataset<Row> streamingDf = spark.readStream. ...;  // columns: guid, eventTime, ...

// 按照惟一标识符对全部数据去重
streamingDf.dropDuplicates("guid");

// 以惟一标识符删除10s内的重复数据
streamingDf
  .withWatermark("eventTime", "10 seconds")
  .dropDuplicates("guid", "eventTime");
复制代码

有状态操做

自从 Spark 2.2 ,可使用 mapGroupsWithState 操做和更强大的操做 flatMapGroupsWithState 来完成,这两个操做都容许在分组的数据集上应用用户定义的代码来更新用户定义的状态异步

Streaming Queries

得到窗口数据后,须要进行流计算,需指定如下一个或多个:socket

  1. output sink 的详细信息: Data format, location, etc.
  2. Output mode : 指定写入 output sink 的内容。
  3. Query name : 可选,指定用于标识的查询的惟一名称。
  4. Trigger interval (触发间隔): 可选,指定触发间隔。 若是未指定,则系统将在上一次处理完成后当即检查新数据的可用性。 若是因为先前的处理还没有完成而致使触发时间错误,则系统将尝试在下一个触发点触发,而不是在处理完成后当即触发。
  5. Checkpoint location : 对于能够保证端对端容错能力的某些 output sinks ,定系统将写入全部 checkpoint 信息的位置。 这应该是与 HDFS 兼容的容错文件系统中的目录。

Output Modes

不一样类型的 streaming queries 支持不一样的 output modeside

查询类型 查询细分 支持的输出模式 说明
Queries with aggregation (聚合) Aggregation on event-time with watermark Append, Update, Complete Append mode使用 watermark 来删除旧聚合状态。Update mode 使用 watermark 删除旧的聚合状态。Complete mode 不会删除旧的聚合状态,由于在定义上Complete mode会保存Result Table 中的全部数据。
Queries with aggregation (聚合) Other aggregations Complete, Update 因为没有定义 watermark,旧的聚合状态不会删除。不支持 Append mode ,由于aggregates聚合能够更新,违反了模式语义。
Queries with mapGroupsWithState Update
Queries with flatMapGroupsWithState Append operation mode Append Aggregations are allowed after flatMapGroupsWithState
Queries with flatMapGroupsWithState Update operation mode Update Aggregations not allowed after flatMapGroupsWithState.
Other queries Append, Update 不支持 Complete mode ,由于将全部未分组数据保存在 Result Table 中是不可行的 。

Output Sinks

//streamingQuery对象有stop(),awaitTermination(),explain()等多个方法
val streamingQuery  =writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .queryName("aggregatestable")    // this query name will be the table name
    .option("checkpointLocation", "path/to/checkpoint/dir")
    .option("path", "path/to/destination/dir")
    .start() //call start() to actually start the execution of the query
spark.sql("select * from aggregatestable").show(); 
复制代码

一样,不一样sink支持不一样的output modes

Sink 支持的输出模式 选项 Fault-tolerant 容错 说明
File Sink Append path: 必须指定输出目录的路径。 Yes 支持对 partitioned tables的写入。Partitioning by time may be useful.
Foreach Sink Append, Update, Compelete None 取决于 ForeachWriter 的实现。
Console Sink Append, Update, Complete numRows: 每一个触发器须要打印的行数(默认:20) truncate: 若是输出太长是否截断(默认: true) No
Memory Sink Append, Complete None 否。可是在 Complete Mode 模式从新启动查询将重建 full table。 Table name is the query name

Foreach

datasetOfString.writeStream().foreach(
  //writer必须是可序列化的,发送到executors执行
  new ForeachWriter<String>() {

    @Override
    // version 是每一个触发器增长的单调递增的 id
    public boolean open(long partitionId, long version) {
      // open connection or init
      //返回fasle,则process不会被调用
    }

    @Override
    public void process(String value) {
      // write string to connection
    }

    @Override
    //当 open 被调用时(不管是返回true/false), close 也将被调用
    public void close(Throwable errorOrNull) {
      // close the connection
    }
  });
复制代码

异步监视

添加StreamingQueryListener对象,有相关任务时将收到callback

SparkSession spark = ...

spark.streams().addListener(new StreamingQueryListener() {
    @Override
    public void onQueryStarted(QueryStartedEvent queryStarted) {
        System.out.println("Query started: " + queryStarted.id());
    }
    @Override
    public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
        System.out.println("Query terminated: " + queryTerminated.id());
    }
    @Override
    public void onQueryProgress(QueryProgressEvent queryProgress) {
        System.out.println("Query made progress: " + queryProgress.progress());
    }
});
复制代码

参考资料

  1. Structured Streaming Programming Guide
相关文章
相关标签/搜索