基于spark 2.2.0html
提供 end-to-end exactly-once semantics (端到端的彻底一次性语义)是 Structured Streaming 设计背后的关键目标之一java
自从 Spark 2.0 , DataFrame 和 Datasets 能够表示 static (静态), bounded data(有界数据),以及 streaming , unbounded data (无界数据)sql
在 Spark 2.0 中,有一些内置的 sources 。apache
每一个事件有一个event-time,假设运行Word count,计算10分钟内的window Word count,每5分钟更新一次,延迟超过10分钟才到达的数据再也不从新接收编程
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words
//watermarking:超过该值到达的数据再也不更新
.withWatermark("timestamp", "10 minutes")
.groupBy(
functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
words.col("word"))
.count();
复制代码
在 non-streaming Dataset (非流数据集)上使用 withWatermark 是不可行的json
event中有unique identifier (惟一标识符),可用于删除重复记录服务器
Dataset<Row> streamingDf = spark.readStream. ...; // columns: guid, eventTime, ...
// 按照惟一标识符对全部数据去重
streamingDf.dropDuplicates("guid");
// 以惟一标识符删除10s内的重复数据
streamingDf
.withWatermark("eventTime", "10 seconds")
.dropDuplicates("guid", "eventTime");
复制代码
自从 Spark 2.2 ,可使用 mapGroupsWithState 操做和更强大的操做 flatMapGroupsWithState 来完成,这两个操做都容许在分组的数据集上应用用户定义的代码来更新用户定义的状态异步
得到窗口数据后,须要进行流计算,需指定如下一个或多个:socket
不一样类型的 streaming queries 支持不一样的 output modeside
查询类型 | 查询细分 | 支持的输出模式 | 说明 |
---|---|---|---|
Queries with aggregation (聚合) | Aggregation on event-time with watermark | Append, Update, Complete | Append mode使用 watermark 来删除旧聚合状态。Update mode 使用 watermark 删除旧的聚合状态。Complete mode 不会删除旧的聚合状态,由于在定义上Complete mode会保存Result Table 中的全部数据。 |
Queries with aggregation (聚合) | Other aggregations | Complete, Update | 因为没有定义 watermark,旧的聚合状态不会删除。不支持 Append mode ,由于aggregates聚合能够更新,违反了模式语义。 |
Queries with mapGroupsWithState | Update | ||
Queries with flatMapGroupsWithState | Append operation mode | Append | Aggregations are allowed after flatMapGroupsWithState |
Queries with flatMapGroupsWithState | Update operation mode | Update | Aggregations not allowed after flatMapGroupsWithState. |
Other queries | Append, Update | 不支持 Complete mode ,由于将全部未分组数据保存在 Result Table 中是不可行的 。 |
//streamingQuery对象有stop(),awaitTermination(),explain()等多个方法
val streamingQuery =writeStream
.format("parquet") // can be "orc", "json", "csv", etc.
.queryName("aggregatestable") // this query name will be the table name
.option("checkpointLocation", "path/to/checkpoint/dir")
.option("path", "path/to/destination/dir")
.start() //call start() to actually start the execution of the query
spark.sql("select * from aggregatestable").show();
复制代码
一样,不一样sink支持不一样的output modes
Sink | 支持的输出模式 | 选项 | Fault-tolerant 容错 | 说明 |
---|---|---|---|---|
File Sink | Append | path: 必须指定输出目录的路径。 | Yes | 支持对 partitioned tables的写入。Partitioning by time may be useful. |
Foreach Sink | Append, Update, Compelete | None | 取决于 ForeachWriter 的实现。 | |
Console Sink | Append, Update, Complete | numRows: 每一个触发器须要打印的行数(默认:20) truncate: 若是输出太长是否截断(默认: true) | No | |
Memory Sink | Append, Complete | None | 否。可是在 Complete Mode 模式从新启动查询将重建 full table。 | Table name is the query name |
datasetOfString.writeStream().foreach(
//writer必须是可序列化的,发送到executors执行
new ForeachWriter<String>() {
@Override
// version 是每一个触发器增长的单调递增的 id
public boolean open(long partitionId, long version) {
// open connection or init
//返回fasle,则process不会被调用
}
@Override
public void process(String value) {
// write string to connection
}
@Override
//当 open 被调用时(不管是返回true/false), close 也将被调用
public void close(Throwable errorOrNull) {
// close the connection
}
});
复制代码
添加StreamingQueryListener对象,有相关任务时将收到callback
SparkSession spark = ...
spark.streams().addListener(new StreamingQueryListener() {
@Override
public void onQueryStarted(QueryStartedEvent queryStarted) {
System.out.println("Query started: " + queryStarted.id());
}
@Override
public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
System.out.println("Query terminated: " + queryTerminated.id());
}
@Override
public void onQueryProgress(QueryProgressEvent queryProgress) {
System.out.println("Query made progress: " + queryProgress.progress());
}
});
复制代码