Apache Pulsar 在能源互联网领域的落地实践

关于 Apache Pulsar
Apache Pulsar 是 Apache 软件基金会顶级项目,是下一代云原生分布式消息流平台,集消息、存储、轻量化函数式计算为一体,采用计算与存储分离架构设计,支持多租户、持久化存储、多机房跨区域数据复制,具备强一致性、高吞吐、低延时及高可扩展性等流数据存储特性。
GitHub 地址:http://github.com/apache/pulsar/git

案例导读:本案例介绍了清华大学能源互联网创新研究院将 Apache Pulsar 落地能源互联网方向的实践。Pulsar 的云原生架构、Schema、Functions 等特性知足了相关业务需求,也减轻了他们开发和运维负担。github

阅读本文须要大约 8 分钟。算法

团队及业务简介

能源互联网是电力与能源工业发展的方向。随着信息、通讯和互联网技术的飞速发展,可获取的数据量正以爆炸式方式迅猛增加,传统的数据处理方法已难以应对这些海量且增加极快的信息资产,大数据理论正是在这样的状态下应运而生。大数据处理技术能帮助咱们透过海量数据快速分辨其运行状态及发展趋势,在纷繁的世界中独具洞察力。数据库

清华大学能源互联网创新研究院能源大数据与开放生态研究中心聚集了国内外能源及电力大数据领域的多位专家,致力于推进大数据基础理论和实践应用的全面创新。能源大数据与开放生态研究中心将大数据技术应用于能源互联网、智能电网和智慧用能等工程场景,结合高性能优化、并行计算和人工智能等先进技术,研发适用于能源电力行业特色的大数据 / 云计算平台,和基于数据驱动的能源电力系统的高级应用,从而实现大数据产业的发展,造成以数据为核心的新型产业链,推进我国能源产业的转型与升级。apache

挑战

咱们团队的业务主要是与电力相关的物联网场景,旨在实现用户对传感器等设备数据的需求开发。咱们团队规模较小,但任务繁杂,但愿能更快更稳地实现客户的需求。后端

在整理业务需求后,咱们提出之后端即服务(BaaS)为主、基于消息的服务方案。在物联网领域内,基于这样的解决方案,咱们能够共用更多基础设施服务,同时能够快速应对不一样需求进行业务开发。考虑到特殊的业务需求,咱们的平台须要具有如下特性:安全

  • 多租户:平台要实现业务分离,服务不分离,又能够确保安全审核,知足客户对数据安全性的敏感需求,就必须支持多租户。此外,还能够在通信、数据、业务这三方面提供一些基础服务,好比自定义数据结构的 Schema Registry,自定义数据归属的 ACL 权限管理(增长删改的 API 接口),以及实现各类业务的自定义函数引擎。
  • Schema Registry:知足不一样需求和应用场景下设备多变的数据结构,提供容许自定义数据结构的 Schema Registry。
  • 通用 API:提供包含增长删改的 HTTP RESTful APIs 和相应的 WebSocket 接口,确保在通信上提供基础服务,并基于这一基础服务进行扩展。
  • ACL 权限管理:可自定义数据的 ACL 权限控制服务,保障数据安全。
  • 时序数据库:多数状况下,物联网场景都在和时序数据打交道,因此咱们选择了基于 PostgreSQL 的开源 TimeScaleDB,而且依托 TimeScaleDB 作了一系列时序数据的聚合查询接口。
  • 用户自定义 functions:实现各类业务的自定义函数引擎。

以前咱们使用基于 RabbitMQ 和 Celery 的方案来实现用户自定义 functions 的函数引擎。这一方案的最初使用效果良好,但随着业务的增加,问题愈来愈多。咱们的小团队不得不花更多时间来解决问题和优化总体方案。当 Celery 做为任务队列时,这些问题尤其严重。性能优化

咱们花费大量的时间和精力处理的问题主要有两个:数据结构

  • 须要仔细配置 Celery 的 worker 和 task,避免执行时间长的任务阻塞其余任务;
  • Worker 更新时须要中断服务,更新时间也相对较长。

此外,在特殊场景中,若是单个消息比较大且消息处理时间长时,Celery 和 RabbitMQ 的内存负担都比较大。架构

随着客户数量和项目数量的增长,这些问题变得日益突出,咱们决定找一个新产品替代原有方案。

为何选择 Apache Pulsar?

如上所述,咱们但愿消息中间件能够提供如下特性:

  • 多租户
  • 可靠性和高可用
  • 支持多协议,尤为能够很方便地转换协议:在物联网领域,咱们须要应对不一样的通讯协议,把不一样通讯协议的数据所有导入到消息中间件中。
  • 支持多语言:咱们团队主要使用 Go 语言,但咱们会和不少使用其余语言的团队合做,因此消息中间件最好能够支持其余语言。
  • 做为轻量级计算引擎实现简单的消息处理。

在调研不一样的消息中间件时,咱们很快发现了 Pulsar。经过 Pulsar 的文档和发布日志,咱们了解到 Pulsar 有不少优秀的特性,因此决定对 Pulsar 进行测试和评估。通过深刻研究、学习,咱们发现 Pulsar 的云原生架构、Schema、Functions 等很是适合咱们的业务需求。

  • 云原生:Pulsar 支持云原生,拥有诸多优秀的特性,如计算与存储分离,能够很好地利用云的弹性伸缩能力,保证扩容和容错。此外,Pulsar 对 Kubernetes 的良好支持也在必定程度上帮助咱们将一部分业务轻松迁移到了 Kubernetes 上。
  • Pulsar Functions:Pulsar Functions 是一个优秀的轻量级计算引擎,能够很好地取代 Celery 方案。咱们能够更多地尝试使用 Pulsar Functions 来处理业务,这是咱们选择 Pulsar 的主要缘由。
  • 分层存储:这一特性可以节约存储成本。咱们的使用场景会产生不少传感器的原始数据,须要做为冷数据存储。借助分层存储,咱们能够直接将这些冷数据存储在价格更低的存储服务中,也无需开发额外的服务来存储数据。
  • MQTT/MoP:Pulsar 对各类协议的兼容展现了社区的开放性。在 MoP 发布前,咱们开发了 MQTT 协议的转发工具,把 MQTT 协议上的数据转发到 Pulsar 中。
  • Pulsar Schema:咱们的平台经过 JSON 来描述数据 schema,经过对接 Pulsar Schema 和咱们本身的 Schema Registry,能够实现消息序列化的工做。目前 Pulsar 在 Go Schema 的功能仍处于起步阶段,咱们也会尝试作一些实践与贡献。
  • 多语言:咱们很看重多语言支持,尤为是 Go 语言。Pulsar 有 Go 语言相应的客户端、Go function runtime、基于 Go 语言实现的 Pulsarctl 等。咱们也但愿 Pulsar 将来能够支持更多语言,由于咱们不能预见客户的需求,支持多语言可以帮助咱们更轻松地解决问题。
  • Pulsar Manager & Dashboard:Pulsar 在各个层级都启动了接口来获取 Metrics。Pulsar 的其余工具(如 Prometheus、Grafana、Pulsar Manager)可以帮助咱们减轻运维、优化、排错的投入。
  • 开源:Pulsar 社区开放、活跃、友好。有 StreamNative 这样的公司作支撑,用户能够放心地选择 Pulsar,把业务迁移到 Pulsar 上。

深刻了解 Pulsar 后,咱们决定对 Pulsar 进行测试,并尝试迁移一个生产环境的应用。

迁移试验:楼宇智慧用电

楼宇智慧用电是咱们在用电分析和预测领域作的一次尝试,咱们但愿采集到办公室中每个用电点的用电信息。在研究院新办公楼装修初期,咱们进行了技术评估,将使用 zigbee 协议的智能插座列入了装修方案。整个部署包含三层楼,约 700 个智能插座和 50 个 zigbee 网关。插座部署在办公场所的全部用电点,包含工位插座、墙壁插座以及中央空调风机插座。全部数据经过智能插座厂商提供的局域网广播方案,将广播数据转发到 Pulsar 中实现数据点的采集和预处理。目前用电量数据每 10 秒钟上送一次,其余与用户相关的操做(包括开关插座、插拔用电设备)则实时上送。针对这些数据,咱们作了一些数据可视化的尝试,并把数据贡献给研究院的其余团队进行分析,或用做开发算法的参考信息和原始数据。

基于智能插座设备厂商提供的 MQTT 方案,咱们尝试将 MQTT 协议的数据都转发到 Pulsar 中。在转发过程当中,咱们遇到的主要问题是 MQTT topic 和 Pulsar topic 的映射。咱们的解决方案是直接把全部的 MQTT 数据转发到同一个 Pulsar topic 中,同时把部分元数据包装在转发的消息中,再经过 Pulsar Functions 作消息路由,把消息转发到不一样的业务 topic 中。下图展现了如何将传感器产生的数据传送至平台并最终入库。

在从 MQTT 转发数据到 Pulsar 的过程当中,咱们默认把全部设备的数据都转发到同一个 topic 中,并经过 verificate function 进行验证(包括解密和内容检查),保障数据的合法性。合法的数据会被转发到一个中间 topic 等待消息路由分发,消息分发的 function 会从数据中解析出设备类型和消息类型,再转发到对应业务 topic 中,等待被对应业务 topic 绑定的 ETL function 作处理。在使用 ETL function 处理时,咱们也会根据设备类型提取不一样的数据,对网关设备提取网关状态、设备信息,对插座提取用电数据和插座的状态信息。这些信息会匹配咱们平台的 Schema Registry 数据结构,咱们再把生成的数据作 Schema Mapping(经过 Functions 实现),最后统一转发这些结构化的数据到 sink topic 中,由 sink function 写入到数据库。

楼宇智慧用电的迁移测试有力验证了 Pulsar 符合咱们的需求。在迁移过程当中,咱们查阅了 Pulsar 文档,从社区得到了大力支持和帮助,迁移过程高效、顺利。借助 Functions 的开放与便利,咱们很快完成了流程图中全部 function 的开发和调试,上线了整个业务系统。

在业务迁移过程当中,Pulsar 运行状态良好,团队一致认为 Pulsar 能够帮助咱们减轻开发和运维负担,因此咱们选择 Pulsar 做为研究中心惟一的消息中间件服务,咱们的小团队也开始跟随 Pulsar 一块儿进行一系列云原生迁移和优化工做。

决定方案后,咱们将 Apache Pulsar 进一步应用到电网智能传感和智能变电所的场景,这些场景都与物联网、能源和电力相关。下文将详细介绍咱们如何使用 Pulsar 和 Pulsar Functions,以及如何经过 Pulsar Functions 简化传感器数据流的相关处理。

Pulsar x 电网智能传感

电网智能传感场景主要基于清华大学能源互联网创新研究院与电网公司合做的输电线路智能多参数传感器集成研究项目。该项目的传感器来自不一样的厂家,分布在输电线路的各个位置,传感器类型所以也不尽相同,包括杆塔、杆塔上、输电线路侧等十多种。整个系统目前接入总长度约六百千米,包含六百多个杆塔的输电线路传感器。这一场景主要负责对各类传感器的数据进行在线监测和告警,同时,咱们也单独针对电压传感器作了暂态电压分析。

这个应用场景有两个难点:一是来自不一样厂商的传感器没有统一的通讯协议,有的使用电力相关的 IEC104 规约,有的使用 protobuf 或其余厂商自定义协议;二是项目数据量比较大,有些传感器可能会单次产生 20 MB 甚至更大的消息,有些传感器则每秒上传一次数据。

借助 Pulsar,咱们选择在 producer 端不作任何数据处理,直接将数据转发到 Pulsar 中,再经过 Pulsar Functions 作进一步的数据预处理和其余业务操做。以电压传感器为例,电压传感器会产生三类数据,分别是心跳数据、稳态波形数据和暂态波形数据。其中心跳数据和稳态波形数据经过 protobuf 协议传输,暂态数据则经过 zip 压缩文件的形式传输。接收到 protobuf 的数据后,借助 Pulsar Functions 进行一系列的数据处理,包括经过解密 function 完成数据解密和 protobuf 的反序列化,再对数据进行路由,经过对应的 ETL function 作数据处理和解析,最后经过 Schema Mapping 将数据入库。咱们把这个流程的每一步都封装成独立的 Pulsar function,这样作出于三点考虑:

  • 咱们但愿监控到整个数据流过程当中每个环节的状态,采集每一个过程的 metrics,而且观测一些重点指标,好比是否存在 backlog 积压。状态监测方便咱们调整每一个环节 function 的并行数量。
  • 使整个数据流更加灵活,便于咱们在不一样流程中新增和删除 function。
  • 更大程度地保障了咱们能够重用本身维护的 function。

这个方案也遇到了一些小困难,好比因为 function 比较多,咱们须要花更多时间部署、维护每个过程的中间 topic。目前,咱们的解决方案是直接写对应的代码一次性完成部署和维护。虽然须要投入更多精力,但咱们认为这种 function 的开发和部署模式是值得的。上文提到电压传感器除了会产生 protobuf 的两种数据外,还会产生一种暂态数据。暂态数据通常在电网发生故障或异常时产生,相似电力系统的快照,记录故障发生前到发生时,再到发生后的波形状态。在电力系统中,暂态数据一般有标准的存储方案和特定的解析接口。相对于传感器产生的其余数据来讲,这类数据的特色是比较大,动辄几十兆。咱们应对暂态数据的方案是先解压缩这些数据,再分析数据文件。这里咱们借助了 Pulsar Functions 多语言支持的特性,流程图中的蓝色部分使用 Go function 实现,黄色部分使用 Python 实现,Python 有一个解析电网暂态数据的库,能够调用,就免去了咱们本身花时间实现一套 Go 版本解析接口的工做。

Pulsar x 智能变电所

智能变电所是咱们在变电系统中变电环节的一些尝试,这个项目基于咱们合做的智能输变电设备厂商,但愿基于开关柜等变电所设备实现变电所的数据接入。这个项目的主要目标是实现实时监测、故障诊断和异常监测这三大功能。

在智能变电所的场景中,一般由设备生产厂商提供设备的故障诊断算法或诊断应用,咱们须要将不一样性质的算法或应用集成到现有方案中。客户提供的算法可能直接在 Pulsar Functions 中调用,也多是已经编译好的可执行文件,甚至多是其余语言的实现,好比 R 语言。针对这一系列问题,咱们先把客户提供的实现封装在 Docker 容器中,在容器中实现一个最小的 Pulsar function runtime,再经过 Docker proxy function 和 Docker endpoint 沟通,在触发 function 时建立对应算法的容器实现计算,最后将结果回传到 Pulsar 对应的 topic 中。

另外,在这一场景中咱们也遇到了一些应用层面的需求,好比消息推送。咱们借助 Pulsar Functions 实现了一些业务功能,在 Functions 中能够很方便地调用不一样服务商的接口,实现消息推送,好比短信、邮件、应用程序的推送服务。此外,经过 Pulsar Functions,咱们得以把消息推送的业务需求从平台中解藕出来,把服务作成 function,便于后续在有一样需求的场景中直接使用。

使用 Pulsar 遇到的问题及解决方案

咱们在使用 Pulsar 的过程当中遇到了一些问题,下文会分享解决这些问题的一些经验,但愿能够对准备或者已经在使用 Pulsar 的同窗提供一些帮助。

第一个是关于 Pulsar 默认消息大小的问题。在默认配置下,Pulsar 支持的最大消息是 5 MB,在上文提到的智慧电网案例中,单条消息有时会超过 20 MB。咱们根据文档修改了 broker 配置文件中的 MaxMessageSize 参数,但修改的配置并无生效,超过 5 MB 的消息依然不能正常传递到 Pulsar 中。因而咱们在 Pulsar 社区寻求帮助,获得了社区的迅速回应。这个问题的主要缘由是 Pulsar 2.4.0 中 MaxMessageSize 没有同步到 BookKeeper,因此即便 broker 能够接收更大的消息,broker 仍然不能把消息传递到负责存储的 BookKeeper 中。所以除了修改 MaxMessageSize 值外,还须要修改 broker 和 BookKeeper 中 nettyFrameSizeBytes 相关配置,这些配置保持一致,Pulsar 就能够处理更大的单条消息。

第二个问题是咱们在使用 Pulsar Functions 处理数据时,topic 中可能会出现 backlog 积压愈来愈多的状况。Backlog 包括没有发送给 Functions(consumer)的数据,也包括已发送但未被 Functions(consumer)ack 的数据。根据咱们的经验,在 Functions 场景下,消息积压多是由于 function 处理单条消息的速度慢,处理时间长,或者 function 崩溃。若是是由于 function 处理消息慢,一种解决方案是增长 function 的并行数量,再具体分析执行速度慢的缘由并进行优化;另外一种方案是把复杂的 function 分红多个简单的 function,也就是在智能电网场景中提到的把一个复杂的 function 拆成多个 function,经过 function 的链式模式把整个流程连接起来。这样咱们能够很方便地观测每个 function 的状态,也能够针对某个 function 作进一步的优化。若是因为 function 崩溃形成 backlog 积压,则须要保障 function 的稳定性,并借助 function 的 log topic 进行调试。

第三个问题是当 producer 数量增长时,很难统一管理和观测每一个 producer 的状态,即 producer 与 broker 之间的通讯状态和 producer 与数据源之间的通讯状态。针对这个问题,咱们目前的解决方案是给 producer 增长心跳消息到对应的心跳 topic 作总体监控,同时,监控 producer 和 broker 的状态链接。经过这些改动,咱们能够较好地聚合观测 producer 的运行状态。咱们注意到 GitHub 上也在讨论相似问题,期待和社区一块儿提出更优秀的解决方案。

期待

咱们期待 Pulsar 能改善或增长如下功能。

  • Pulsar Functions Mesh 实现了对 function 进行相似于 Kubernetes 的服务编排,咱们期待该功能的发布。上文提到咱们实现了链式 function 的解决方案,但这种方式在维护上遇到很大挑战,但愿 Functions mesh 能够解决这个问题。
  • 但愿 Pulsar functions 支持更多语言的 runtime。咱们用 function 作 Docker proxy function,这个方案虽然可行,但但愿有更优秀的解决方案。
  • IoT 场景很注重边缘计算,咱们但愿 Pulsar 能够在边缘计算上作一些尝试。咱们关注到 Pulsar 容许将 Functions 的消息推送到另外一个 Pulsar 集群中,容许 Functions 与外部 Pulsar 集群通信。经过这一改动,能够尝试将 Pulsar 部署到边缘设备上,并使用 Pulsar Functions 在这些设备上进行计算。部署 Pulsar 对内存的需求较大,在一些运算能力较弱的边缘设备上部署 Pulsar 比较困难,但愿 Pulsar 能在后续版本中优化或提供其余方案解决这一困扰。

结语

做为一个开源项目,Pulsar 正在快速发展,文档更新迅速,社区响应及时,社区规模不断壮大。咱们但愿深刻了解 Pulsar,参与 Pulsar 开发贡献,和社区分享咱们的实践经验,与 Pulsar 社区共同发展。

在使用 Pulsar 的过程当中,咱们遇到一些困惑,感谢 StreamNative 团队小伙伴们的大力支持,帮助咱们顺利将 Pulsar 应用到上述业务场景中。将来,咱们会积极尝试 Pulsar 的各类新功能,并将 Pulsar 应用于更多的能源互联网场景中。

做者简介

胡军,清华大学电机系副教授,清华大学能源互联网创新研究院能源大数据与开放生态研究中心执行主任,IEEE Member,CIGRE Member。

相关阅读

相关文章
相关标签/搜索