做者:Genmao Yu
原文连接:https://databricks.com/blog/2020/07/29/a-look-at-the-new-structured-streaming-ui-in-apache-spark-3-0.htmlhtml
编译:邵嘉阳,计算机科学与技术大三在读,Apache Spark 中文社区志愿者java
在Apache Spark 2.0中,咱们迎来了Structured Streaming——构建分布式流处理应用的最佳平台。统一的API(SQL,Dataset和DataFrame)以及Spark内置的大量函数为开发者实现复杂的需求提供了便利,好比流的聚合,流-流链接和窗口支持。开发者们广泛喜欢经过Spark Streaming中的DStream的方式来管理他们的流,那么相似的功能何时能在Structured Streaming中获得实现呢?这不,在Apache Spark 3.0中,全新的Structured Streaming可视化UI和开发者们见面了。web
新的Structured Streaming UI会提供一些有用的信息和统计数据,以此来监视全部流做业,便于在开发调试过程当中排除故障。同时,开发者还可以得到实时的监测数据,这能使生产流程更直观。在这个新的UI中,咱们会看到两组统计数据:1)流查询做业的聚合信息;2)流查询的具体统计信息,包括输入速率(Input Rate)、处理速率(Process Rate)、输入行数(Input Rows)、批处理持续时间(Batch Duration)和操做持续时间(Operation Duration)等。apache
流查询做业的聚合信息
开发者提交的流SQL查询会被列在Structured Streaming一栏中,包括正在运行的流查询(active)和已完成的流查询(completed)。结果表则会显示流查询的一些基本信息,包括查询名称、状态、ID、运行ID、提交时间、查询持续时间、最后一批的ID以及一些聚合信息,如平均输入速率和平均处理速率。流查询有三种状态:运行(RUNNING)、结束(FINISHED)、失败(FAILED)。全部结束(FINISHED)和失败(FAILED)的查询都在已完成的流式查询表中列出。Error列显示有关失败查询的详细信息。bootstrap
咱们能够经过单击Run ID连接查看流查询的详细信息。服务器
详细的统计信息
Statistics页面显示了包括输入速率、处理速率、延迟和详细的操做持续时间在内的一系列指标。经过图表,开发者能全面了解已提交的流查询的状态,而且轻松地调试查询处理中的异常状况。微信
它包含如下指标:app
Input Rate:数据到达的聚合速率(跨全部源)。dom
Process Rate:Spark处理数据的聚合速率(跨全部源)。分布式
Batch Duration:每一批的处理时间。
Operation Duration:执行各类操做所花费的时间(以毫秒为单位)。
被追踪的操做罗列以下:addBatch:从源读取微批的输入数据、对其进行处理并将批的输出写入接收器所花费的时间。这应该会占用微批处理的大部分时间。
getBatch:准备逻辑查询以从源读取当前微批的输入所花费的时间。
getOffset:查询源是否有新的输入数据所花费的时间。
walCommit:将偏移量写入元数据日志。
queryPlanning:生成执行计划。
须要注意的是,因为数据源的类型不一样,一个查询可能不会包含以上列出的全部操做。
使用UI解决流的性能故障
在这一部分中,咱们会看到新的UI是怎样实时、直观地显示查询执行过程当中的异常状况的。咱们会在每一个例子中预先假设一些条件,样例查询看起来是这样的:
import java.util.UUID
val bootstrapServers = ...
val topics = ...
val checkpointLocation = "/tmp/temporary-" + UUID.randomUUID.toString
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation", checkpointLocation)
.start()
因为处理能力不足而增长延迟
在第一种状况下,咱们但愿尽快处理Apache Kafka数据。在每一批中,流做业将处理Kafka中全部可用的数据。若是处理能力不足以处理批数据,那么延迟将迅速增长。最直观的现象是Input Rows和Batch Duration会呈线性上升。Process Rate提示流做业每秒最多只能处理大约8000条记录,可是当前的输入速率是每秒大约20000条记录。产生问题的缘由一目了然,那么咱们能够为流做业提供更多的执行资源,或者添加足够的分区来处理与生产者匹配所需的全部消费者。
稳定但高延迟
第二种状况下,延迟并无持续增长,而是保持稳定,以下截图所示:
咱们发如今相同的Input Rate下,Process Rate能够保持稳定。这意味着做业的处理能力足以处理输入数据。然而,每批的延迟仍然高达20秒。这里,高延迟的主要缘由是每一个批中有太多数据,那么咱们能够经过增长这个做业的并行度来减小延迟。在为Spark任务添加了10个Kafka分区和10个内核以后,咱们发现延迟大约为5秒——比20秒要好得多。
使用操做持续时间图进行故障排除
操做持续时间图(Operation Duration Chart)显示了执行各类操做所花费的时间(以毫秒为单位)。这对于了解每一个批处理的时间分布和故障排除很是有用。让咱们以Apache Spark社区中的性能改进“Spark-30915:在查找最新批处理ID时避免读取元数据日志文件“为例。
在某次查询中咱们发现,当压缩后的元数据日志很大时,下一批要花费比其余批更多的时间来处理。
在进行代码审查以后,咱们发现这是由对压缩日志文件的没必要要读取形成的并进行了修复。新的操做持续时间图确认了咱们想法:
将来的开发方向
如上所示,新的Structured Streaming UI将经过提供更有用的流查询信息帮助开发者更好地监视他们的流做业。做为早期发布版本,新的UI仍在开发中,并将在将来的发布中获得改进。有几个将来能够实现的功能,包括但不限于:
更多的流查询执行细节:延迟数据,水印,状态数据指标等等。
在Spark历史服务器中支持Structured Streaming UI。
对于不寻常的状况有更明显的提示:发生延迟等。
近期活动:
8月24日开始 Spark 实战训练营正式开课,戳文末 阅读原文 便可报名
免费报名连接:https://developer.aliyun.com/learning/trainingcamp/spark/2
阿里巴巴开源大数据技术团队成立Apache Spark中国技术社区,按期推送精彩案例,技术专家直播,问答区近万人Spark技术同窗在线提问答疑,只为营造纯粹的Spark氛围,欢迎钉钉扫码加入!
对开源大数据和感兴趣的同窗能够加小编微信(下图二维码,备注“进群”)进入技术交流微信群。
Apache Spark技术交流社区公众号,微信扫一扫关注
本文分享自微信公众号 - Apache Spark技术交流社区(E-MapReduce_Spark)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。