Structured Streaming编程向导

简介

  Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.html

  Internally, by default, Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees.java

  However, since Spark 2.3, we have introduced a new low-latency processing mode called Continuous Processing, which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees. Without changing the Dataset/DataFrame operations in your queries, you will be able to choose the mode based on your application requirements.python

  In this guide, we are going to walk you through the programming model and the APIs. We are going to explain the concepts mostly using the default micro-batch processing model, and then later discuss Continuous Processing model. First, let’s start with a simple example of a Structured Streaming query - a streaming word count.git

  Structured Streaming是一个可缩放、容错的流逝处理引擎,基于Spark SQL引擎构建。当你在处理流计算时,能够像处理静态数据批计算同样。Spark SQL引擎负责不断地连续运行它,并随着流数据持续到达而更新最终结果。你能够在Scala、Java、Python或者R中使用Dataset/DataFrame API来表示流集合(aggregations)、事件时间窗口(event-time windows)、流到批链接(stream-to-batch joins)等。计算在同一个优化的Spark SQL引擎上被执行。最终,该系统经过检查点(checkpoint)和预先写日志(Write Ahead Logs)来确保端到端一次性执行的容错保证(ensures end-to-end exactly-once guarantees)。简而言之,Structured Streaming提供了快速、可伸缩、容错、端到端一次性流处理,而用户无需对流进行推理。github

  在内部,默认状况下,Structured Streaming(结构化流)查询使用微批处理引擎(a micro-batch procession engine),该微批处理引擎将数据流处理为一系列小批做业,从而实现低至100毫秒的端到端延迟,而且具备一次性执行容错保证(and exactly-once fault-tolerance guarantees)。sql

  然而,从Spark2.3,咱们引入了一种低延迟处理模式,称为连续处理,它能够实现端到端延迟低至1毫秒,并提供至少一次性能保证。在查询中不须要修改Dataset/DataFrame操做的状况下,你将可以基于你的系统需求选择这种模式。shell

  在该向导中,咱们将向你介绍编程模式和API。咱们会解释大多使用默认的微批处理的概念,而后讨论连续处理模型。首先,让咱们以一个使用Structured Streaming查询(一个流式的单词计数)的简单例子开始。express

快捷例子(Quick Example)

  Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code in Scala/Java/Python/R. And if you download Spark, you can directly run the example. In any case, let’s walk through the example step-by-step and understand how it works. First, we have to import the necessary classes and create a local SparkSession, the starting point of all functionalities related to Spark.apache

  让咱们说你想维持一个运行的单词个数,从数据服务器监听TCP套接字接收的文本数据。让咱们看看在Structured Streaming中如何表达。你能够在Scala/Java/Python/R中查看所有代码。同时若是你下载download Spark,你能够直接运行这个例子(run the example.)。不管如何,咱们按部就班地了解这个例子,了解它如何工做的。首先,咱们导入必要的类并建立本地SparkSession对象,这是全部与Spark相关的功能的切点。编程

import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;

import java.util.Arrays;
import java.util.Iterator;

SparkSession spark = SparkSession
  .builder()
  .appName("JavaStructuredNetworkWordCount")
  .getOrCreate();

Next, let’s create a streaming DataFrame that represents text data received from a server listening on localhost:9999, and transform the DataFrame to calculate word counts.

下一步,让咱们建立一个流式的DataFrame,它用来表示从服务器localhost:9999监听接收到的文本数据,并转换DataFrame来计算单词计数。

// Create DataFrame representing the stream of input lines from connection to localhost:9999
Dataset<Row> lines = spark
  .readStream()
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load();

// Split the lines into words
Dataset<String> words = lines
  .as(Encoders.STRING())
  .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING());

// Generate running word count
Dataset<Row> wordCounts = words.groupBy("value").count();

This lines DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named “value”, and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have converted the DataFrame to a Dataset of String using .as(Encoders.STRING()), so that we can apply the flatMap operation to split each line into multiple words. The resultant words Dataset contains all the words. Finally, we have defined the wordCounts DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.

We have now set up the query on the streaming data. All that is left is to actually start receiving data and computing the counts. To do this, we set it up to print the complete set of counts (specified by outputMode("complete")) to the console every time they are updated. And then start the streaming computation using start().

DataFrame类型的lines变量表明一个包含流式文本数据的无界表。该表包含列名为“value”的字符串,并在流文本数据中的一行成为了表中的一行记录。请注意,到目前还未接收到任何数据,由于咱们才刚创建的转换还未开始。接下来,咱们使用.as(Encoders.STRING())把lines类型从DataFrame转变为Dataset,为了使用flatMap操做来分割每一行记录中的多个单词。这个合成的words数据集包含了全部单词。最后,咱们定义了一个类型为DataFrame的wordCounts变量,用来在Dataset中按照惟一键值进行分组和计数。请注意这是一个流式的DataFrame,它表示了流式运行的单词计数。

让咱们如今已经设置了查询这个流式数据。剩下的事情是真正开始接收数据、计数。为此,咱们设置了在每次数据更新时打印counts所有集合(指定的输出模式(“complete”))到控制台。而后,调用start()来开始数据流计算。

// Start running the query that prints the running counts to the console
StreamingQuery query = wordCounts.writeStream()
  .outputMode("complete")
  .format("console")
  .start();

query.awaitTermination();

After this code is executed, the streaming computation will have started in the background. The query object is a handle to that active streaming query, and we have decided to wait for the termination of the query using awaitTermination() to prevent the process from exiting while the query is active。

To actually execute this example code, you can either compile the code in your own Spark application, or simply run the example once you have downloaded Spark. We are showing the latter. You will first need to run Netcat (a small utility found in most Unix-like systems) as a data server by using

在代码执行后,流运算将会在后台开始。这query对象是活动流查询句柄,而且咱们决定使用awaitTermination()终止查询,以防止进程在查询处于活动状态时退出。

要实际执行这段代码,你能够在你的spark应用程序中编译,或者在下载spark代码以后简单的运行该示例。咱们在展现后者。首先你须要运行Netcat(在大多数UNIX类系统中发现的小型实用程序)做为数据服务器。

$ nc -lk 9999

Then, in a different terminal, you can start the example by using

而后在另一个终端上,你能够运行这个示例。

$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999

Then, any lines typed in the terminal running the netcat server will be counted and printed on screen every second. It will look something like the following.

而后,运行在netcat服务器的终端中输入的每一行被计数并每秒打印到屏幕上,它看起来像下边这样:

实际运行中,netcat终端运行状况以下:

在spark job提交的终端上显示以下:

编程模型(Programing Model)

The key idea in Structured Streaming is to treat a live data stream as a table that is being continuously appended. This leads to a new stream processing model that is very similar to a batch processing model. You will express your streaming computation as standard batch-like query as on a static table, and Spark runs it as an incremental query on the unbounded input table. Let’s understand this model in more detail.

Structured Streaming的核心思想是把在线数据流视为连续追加的表。这就致使了一个新的流处理模型,它很是相似批处理模式。你能够把你的流计算像标准的批处理查询表示为一个静态表,而spark将它视为无界表上的一个增量查询来运行。让咱们更详细的了解这种模型。

基本概念(Basic Concepts)

Consider the input data stream as the “Input Table”. Every data item that is arriving on the stream is like a new row being appended to the Input Table.

把输入数据流当作“输入表”。在流上到达的每条数据记录被当作“输入表”新的一行记录追加进来。

 A query on the input will generate the “Result Table”. Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink.

对于输入的查询将生成“结果表(Result Table)”。每次触发间隔(好比说,每1秒),新的记录将会追加到输入表(Input Table),最终被更新到结果表(Result Table)。每当结果表被修改时,咱们但愿将更改后的结果行写入外部接收器。

 

The “Output” is defined as what gets written out to the external storage. The output can be defined in a different mode:

  • Complete Mode - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.

  • Append Mode - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.

  • Update Mode - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.

Note that each mode is applicable on certain types of queries. This is discussed in detail later.

"Output"是用来定义写入外部存储器的内容。输出能够被定义为不一样模型:

“Complete 模型”----整个更新后的结果表将会被写入到外部存储器。取决于存储链接器来决定如何处理整个表的写入。

“Append 模型”   ----只有最后一个触发器中附加的新行将被写入外部存储。这仅仅适用于预期结果表中现有行不发生更改的查询。

“Update 模型”    ----只有最后一个触发器中在结果表中更新的行被写入外部存储(从spark2.1.1才可使用)。请注意,这与Complete模式不一样,由于该模式只输出自上次触发器以来已经改变的行。若是查询不包含聚合,那么等同于Append模式。

注意,每种模型都适用于某些类型的查询。这将在后面详细讨论。

To illustrate the use of this model, let’s understand the model in context of the Quick Example above. The first lines DataFrame is the input table, and the final wordCounts DataFrame is the result table. Note that the query on streaming lines DataFrame to generate wordCounts is exactly the same as it would be a static DataFrame. However, when this query is started, Spark will continuously check for new data from the socket connection. If there is new data, Spark will run an “incremental” query that combines the previous running counts with the new data to compute updated counts, as shown below.

为了说明模型的使用,让咱们在上边的快捷示例上下文中理解模型。第一行的lines DataFrame是输入表,最后一行的wordcounts DataFrame是结果表。请注意:流式查询lines DataFrame生成wordCounts与静态DataFrame彻底同样的。可是,当流查询开始后,Spark将会持续检查从socket链接中而来的新数据。若是有新数据从socket链接中进来,Spark将会执行一个“增量”查询,将先前运行的计数与这些新数据进行结合而后计算更新计数,以下所示:

Note that Structured Streaming does not materialize the entire table. It reads the latest available data from the streaming data source, processes it incrementally to update the result, and then discards the source data. It only keeps around the minimal intermediate state data as required to update the result (e.g. intermediate counts in the earlier example).

This model is significantly different from many other stream processing engines. Many streaming systems require the user to maintain running aggregations themselves, thus having to reason about fault-tolerance, and data consistency (at-least-once, or at-most-once, or exactly-once). In this model, Spark is responsible for updating the Result Table when there is new data, thus relieving the users from reasoning about it. As an example, let’s see how this model handles event-time based processing and late arriving data.

请注意,Structured Streaming不会实现整个表。它从流数据源读取最新可用的数据,增量地处理它以更新结果,而后丢弃源数据。它只保留最小中间状态数据以更新结果(例如,较早的示例的中间计数)。

该模型与其余流处理引擎有很大的不一样。不少流系统要求用户要本身去维护运行的聚合,所以必须关注容错性,数据一致性(至少一次,或至少屡次,或准确地一次)。在这种模型下,Spark的职责就是当有新的数据的状况下更新结果表,从而减小用户对其的推理。举个例子,当咱们看看这种模型是如何处理基于事件的处理和迟到达的数据的。

处理事件的时间和延时数据(Handling Event-time and Late Data)

Event-time is the time embedded in the data itself. For many applications, you may want to operate on this event-time. For example, if you want to get the number of events generated by IoT devices every minute, then you probably want to use the time when the data was generated (that is, event-time in the data), rather than the time Spark receives them. This event-time is very naturally expressed in this model – each event from the devices is a row in the table, and event-time is a column value in the row. This allows window-based aggregations (e.g. number of events every minute) to be just a special type of grouping and aggregation on the event-time column – each time window is a group and each row can belong to multiple windows/groups. Therefore, such event-time-window-based aggregation queries can be defined consistently on both a static dataset (e.g. from collected device events logs) as well as on a data stream, making the life of the user much easier.

Furthermore, this model naturally handles data that has arrived later than expected based on its event-time. Since Spark is updating the Result Table, it has full control over updating old aggregates when there is late data, as well as cleaning up old aggregates to limit the size of intermediate state data. Since Spark 2.1, we have support for watermarking which allows the user to specify the threshold of late data, and allows the engine to accordingly clean up old state. These are explained later in more detail in the Window Operations section.

事件时间是嵌入在数据自身的时间。对于不少应用程序,你均可能但愿基于事件时间的操做(运行)。好比,你但愿获取到IoT设备每分钟产生的事件次数,你可能想使用数据产生的时间(也就是,数据中的事件时间),而不是Spark接收到数据的时间。这个事件时间在这种模型中很天然地表达出来------从设备来的每一个事件都是表(流的无界表)中的一行,并且事件时间是行中的列值。这容许基于窗口的聚合(好比,每分钟事件次数)只是事件时间列上的一种特殊类型的分组和聚合------每一个时间窗口是一组,每一行可能属于多个窗口/分组。这样的基于事件时间窗口统计查询可使用在静态数据集(好比,从收集设备的事件日志)以及数据流上一致地定义,使得用户的生活更容易。

此外,这种模型也很天然地处理基于其事件时间而到达的比预期时间晚的数据。因为Spark是正在更新结果表,因此当存在延时数据时,它彻底控制更新旧的聚合,以及清理旧的聚合以限制中间状态数据的大小。自Spark2.1开始,咱们支持水印(watermarking),容许用户指定延时数据的阈值,并容许引擎相应地清理旧状态。稍后将在窗口操做部分对此进行更详细的说明。

相关文章
相关标签/搜索