时序数据库在工业物联网中的应用

DolphinDB在工业物联网的应用

  1. 工业物联网的数据特色和痛点

工业物联网的数据采集有着频率高、设备多、维度高的特色,数据量很是大,对系统的吞吐量有很高的要求。同时工业物联网每每须要系统可以实时处理数据,对系统预警,监控,甚至反控。很多系统还须要提供图形化终端供操做工人实时监控设备的运行,这给整个系统带来了更大的压力。对于采集到的海量历史数据,一般还须要进行离线的建模和分析。所以,工业物联网的数据平台有着很是苛刻的要求,既要有很是高的吞吐量,又要有较低的延时;既要可以实时处理流数据,又要可以处理海量的历史数据;既要知足简单的点查询的要求,又要知足批量数据复杂分析的要求。前端

传统的事务型数据库,好比SQL Server、Oracle和MySQL,没法知足高吞吐量的数据写入和海量数据的分析。即便数据量较小,能知足数据写入的要求,也不能同时响应实时计算的请求。node

Hadoop生态提供了消息引擎、实时数据写入、流数据计算、离线数据仓库、离线数据计算等多个部件。这些大数据系统组合起来,能够解决工业物联网的数据平台问题。但这样的方案过于庞大和臃肿,实施和运维的成本很高。git

2.时序数据库的工业物联网解决方案github

以DolphinDB为例,DolphinDB database 做为一个高性能的分布式时序数据库,为工业物联网的数据存储和计算提供了一个强大的基础平台。数据库

  • DolphinDB的分布式数据库能够方便的支持水平扩展和垂直扩展,系统的吞吐量和支持的数据量能够近乎无限的扩展。
  • DolphinDB的流计算引擎支持实时流计算处理。内置的聚合引擎能够按指定的时间窗口大小和频率来计算各类聚合指标。聚合既能够是时间轴上(从高频到低频)的纵向聚合,也能够是多个维度的横向聚合。
  • DolphinDB的内存数据库能够支持数据的快速写入,查询和计算。例如聚合引擎的结果能够输出到一个内存表,接受前端BI(如Grafana)的的秒级轮询指令。
  • DolphinDB集数据库、分布式计算和编程语言于一体,能够在库内快速的完成复杂的分布式计算,例如回归和分类。这大大加快了海量历史数据的离线分析和建模。
  • DolphinDB也实现了与部分开源或商业化的BI工具的接口。方便用户可视化或监控设备数据。

3.案例综述编程

企业的生产车间内总共有1000个传感设备,每一个设备每10ms采集一次数据,为简化demo脚本,假设采集的数据仅有三个维度,均为温度。须要完成的任务包括:服务器

  • 将采集到的原始数据存入数据库。离线的数据建模须要用到这些历史数据。
  • 实时计算每一个设备过去一分钟的平均温度指标,计算的频率为每两秒钟要进行一次。
  • 由于设备的操做工须要在最快的时间内掌握温度变化,因此前端展现界面每秒查询一次实时运算的结果并刷新温度变化趋势图。

4.案例实施app

4.1 系统的功能模块设计运维

针对上述的案例,咱们首先要启用DolphinDB的分布式数据库,建立一个命名为iotDemoDB的分布式数据库用于保存采集的实时数据。数据库按日期和设备两个维度进行数据分区。日期采用值分区,设备采用范围分区。往后清理过时数据,只要简单的删除旧的日期分区就可完成。编程语言

启用流数据发布和订阅功能。订阅高频数据流作实时计算。createStreamingAggregator函数能建立一个指标聚合引擎,用于实时计算。咱们在案例里指定计算窗口大小是1分钟,每2秒钟运算一次过往1分钟的温度均值,而后将运算结果保存到低频数据表中,供前端轮询。

部署前端Grafana平台展现运算结果的趋势图,设置每1秒钟轮询一次DolphinDB Server,并刷新展现界面。

4.2 服务器部署

在本次demo里,为了使用分布式数据库,咱们须要使用一个单机多节点集群,能够参考单机多节点集群部署指南。这里咱们配置了1个controller+1个agent+4个datanode的集群,下面列出主要的配置文件内容供参考:

cluster.nodes:

localSite,mode
localhost:8701:agent1,agent
localhost:8081:node1,datanode
localhost:8083:node2,datanode
localhost:8082:node3,datanode
localhost:8084:node4,datanode

因为DolphinDB系统默认是不启用Streaming模块功能的,因此咱们须要经过在cluster.cfg里作显式配置来启用它,由于本次demo里使用的数据量不大,为了不demo复杂化,因此这里只启用了node1来作数据订阅。

cluster.cfg:

maxMemSize=2
workerNum=4
persistenceDir=dbcache
maxSubConnections=4
node1.subPort=8085
maxPubConnections=4

实际生产环境下,建议使用多物理机集群,能够参考多物理机集群部署指南

4.3 实现步骤

首先咱们定义一个sensorTemp流数据表用于接收实时采集的温度数据,咱们使用enableTablePersistence函数对sensorTemp表作持久化,内存中保留的最大数据量是100万行。

share streamTable(1000000:0,hardwareIdtstemp1temp2`temp3,[INT,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE]) as sensorTemp
enableTablePersistence(sensorTemp, true, false, 1000000)

经过订阅流数据表sensorTmp,把采集的数据准实时的批量保存到分布式数据库中。分布式表使用日期和设备编号两个分区维度。在物联网大数据场景下,常常要清除过期的数据,这样分区的模式能够简单的经过删除指定日期分区就能够快速的清理过时数据。subscribeTable函数最后两个参数控制数据保存的频率,只有订阅数据达到100万或时间间隔达到10秒才批量将数据写入分布式数据库。

db1 = database("",VALUE,2018.08.14..2018.12.20)
db2 = database("",RANGE,0..10*100)
db = database("dfs://iotDemoDB",COMPO,[db1,db2])
dfsTable = db.createPartitionedTable(sensorTemp,"sensorTemp",tshardwareId)
subscribeTable(, "sensorTemp", "save_to_db", -1, append!{dfsTable}, true, 1000000, 10)

在对流数据作分布式保存数据库的同时,系统使用createStreamAggregator函数建立一个指标聚合引擎, 用于实时计算。函数第一个参数指定了窗口大小为60秒,第二个参数指定每2秒钟作一次求均值运算,第三个参数是运算的元代码,能够由用户本身指定计算函数,任何系统支持的或用户自定义的聚合函数这里都能支持,经过指定分组字段hardwareId,函数会将流数据按设备分红1000个队列进行均值运算,每一个设备都会按各自的窗口计算获得对应的平均温度。最后经过subscribeTable订阅流数据,在有新数据进来时触发实时计算,并将运算结果保存到一个新的数据流表sensorTempAvg中。

createStreamAggregator 参数说明:窗口时间,运算间隔时间,聚合运算元代码,原始数据输入表,运算结果输出表,时序字段,分组字段,触发GC记录数阈值。

share streamTable(1000000:0, timehardwareIdtempavg1tempavg2`tempavg3, [TIMESTAMP,INT,DOUBLE,DOUBLE,DOUBLE]) as sensorTempAvg
metrics = createStreamAggregator(60000,2000,<[avg(temp1),avg(temp2),avg(temp3)]>,sensorTemp,sensorTempAvg,ts,hardwareId,2000)
subscribeTable(, "sensorTemp", "metric_engine", -1, append!{metrics},true)

在DolphinDB Server端在对高频数据流作保存、分析的时候,Grafana前端程序每秒钟会轮询实时运算的结果,并刷新平均温度的趋势图。DolphinDB提供了Grafana_DolphinDB的datasource插件,关于Grafana的安装以及DolphinDB的插件配置请参考Grafana配置教程

在完成grafana的基本配置以后,新增一个Graph Panel, 在Metrics tab里输入:

select gmtime(time) as time, tempavg1, tempavg2, tempavg3 from sensorTempAvg where hardwareId = 1

这段脚本是选出1号设备实时运算获得的平均温度表。

8cb3647ad8787ac3bb57c127166fda36.png

最后,启动数据模拟生成程序,生成模拟温度数据并写入流数据表。

数据规模: 1000 个设备,以每一个点3个维度、10ms的频率生成数据,以每一个维度8个Byte ( Double类型 ) 计算,数据流速是 24Mbps,持续100秒。

def writeData(){
hardwareNumber = 1000
for (i in 0:10000) {
data = table(take(1..hardwareNumber,hardwareNumber) as hardwareId ,take(now(),hardwareNumber) as ts,rand(20..41,hardwareNumber) as temp1,rand(30..71,hardwareNumber) as temp2,rand(70..151,hardwareNumber) as temp3)
sensorTemp.append!(data)
sleep(10)
}
}
submitJob("simulateData", "simulate sensor data", writeData)

点击这里下载完整的demo脚本。

相关文章
相关标签/搜索