Streamr助你掌控本身的数据(2)——三种整合数据至Streamr的典型场景

博客说明

全部刊发内容都可转载可是须要注明出处。
html

三种整合数据至Streamr的典型场景

本系列文档主要介绍怎么经过Streamr管理本身的DATA,整个系列包括三篇教程文档,分别是:教你5分钟上传数据至Streamr三种整合数据至Streamr的典型场景教你在Streamr市场上发布数据。全部文档均参考Streamr blog。前两篇主要偏向技术文档,因此须要有必定的技术背景。第三篇不包含任何技术知识,大部分人均可以按照教程来完成相应的操做。node

简介

第一篇文档主要介绍如何经过调用API接口上传数据至Streamr,本篇教程文档主要介绍了三种整合数据至Streamr网络的典型示例。包括: 1. 直接从数据源推送数据至Streamr网络; 2. 数据源和Streamr之间创建桥接(数据源向桥接点实时推送数据); 3. 数据源和Streamr之间创建桥接(桥接点向数据源发送轮询请求)。git

相关概念说明

在Streamr网络中,每一个数据点属于一个Stream。每一个数据点(或者成为事件、消息)包含了标有时间戳的一系列信息,好比,传感器采集的数据、即时通信的消息等。根据使用场景的不一样,一个Stream能够包括一个源的数据或者是多个数据源的数据。Streamr网络为用户上传数据提供API接口,用户调用该API接口最简单的方式是使用Streamr客户端库。当前的Streamr客户端库是JavaScript编写的,其余语言的客户端库还在开发中。若是您不方便使用当前的Streamr客户端库,您也可使用HTTP库来调用API接口,相关方法请参见上一篇说明文档。程序员

场景一:直接从数据源推送数据至Streamr网络

  • 复杂度:低
  • 延迟:低
  • 可用性:中等
    在这种模式下,数据源一产生新的数据,将被当即直接推送至Streamr网络。这种模式是官方推荐的,可是受环境影响,这种模式有可能会不可用。这种模式须要用户可以控制产生数据的系统而且可以决定该数据发送的地方。举例来讲,若是您是IoT设备生产商,您能够直接将Streamr客户端直接植入这些IoT设备中,而且容许设备使用者上传数据至Streamr网络。大部分的工业设备具备良好的可配置特性,能够随时将产生的数据接入Streamr网络。然而,诸如汽车、手机、智能手环等消费级设备常常强制将用户数据发送至设备制造商的云端,而后向用户提供访问该数据的API接口。在这种场景下,用户可使用桥接模式将数据接入到Streamr网络(具体参见下一小节)。
    下面以一个实际应用为例说明如何直接从数据源推送数据至Streamr网络。这里主要介绍如何将IoT传感器Ruuvi产生的测量数据实时上传至Streamr。该设备经过低功耗蓝牙将数据传输至一台装有Streamr客户端库的网关计算机。其中,网关计算机为每一个设备单独创建一个Stream,而且将设备产生的测量数据实时上传至所建立的Stream。其示例代码以下:
const StreamrClient = require('streamr-client')
const ruuvi = require('node-ruuvitag')
const API_KEY = 'MY-API-KEY'
const client = new StreamrClient({
    apiKey: API_KEY
})
const streams = {}
console.log('Listening for RuuviTags...')
ruuvi.on('found', tag => {
    // Create a Stream for this tag if it doesn't exist yet
    if (!streams[tag.id]) {
        streams[tag.id] = await client.getOrCreateStream({
            name: 'Ruuvi ' + tag.id
        })
    }
    tag.on('updated', async (data) => {
        const stream = streams[tag.id]
        try {
            // Produce the data point to the stream
            await stream.produce(data)
        } catch (err) {
            console.error(err)
        }
    })
})

场景二:数据源和Streamr之间创建桥接(数据源向桥接点实时推送数据)

  • 复杂度:中等
  • 延迟:低
  • 可用性:中等
    这种模式适用于您没法直接控制数据源的场景,用户能够经过一系列API接口实时获取数据。具体地,在这种模式下,数据源实时将新产生的数据推送给用户,用户收到通知后当即将该数据上传至Streamr。

示例1

示例1主要介绍如何使用Bitfinex客户端库和Streamr客户端库,将Bitfinex上DATA/USD交易数据和Streamr创建链接。github

const BFX = require('bitfinex-api-node')
const StreamrClient = require('streamr-client')
const STREAM_ID = 'MY-STREAM-ID'
const API_KEY = 'MY-API-KEY'
const bws = new BFX('', '', {
    version: 2,
    transform: true
}).ws
const client = new StreamrClient({
    apiKey: API_KEY
})
bws.on('open', () => {
    bws.subscribeTicker('DATUSD')
})
bws.on('ticker', (pair, ticker) => {
    console.log('Ticker:', ticker)
    client.produceToStream(STREAM_ID, ticker)
        .then(() => console.log('Sent to Streamr!'))
        .catch((err) => console.error(err))
})

示例2

示例1主要从编程角度介绍如何创建相应的链接,若是您非程序员,那可使用本示例中的可视化编辑器Streamr Editor来创建相应的链接。可是,使用该可视化编辑器也须要用户了解一些API接口和协议等技术知识。下面展现一个经过MQTT API监测电车实时位置的过程,该过程已得到赫尔辛基公共交通的受权。编程

# MQTT 模型参数
URL: mqtt://mqtt.hsl.fi
Topic: /hfp/v1/journey/ongoing/tram/#

MQTT 模型数据JSON格式的字符串,经过JSON解析器模型解析为一个对象。从该对象中提取出'VP'字段,该字段包含的也是一个对象。从提取出的对象中获取'desi','lat','long'和'veh'字段的值,而后通过数据处理以后经过SendToStream模型将该数据上传至Streamr。

场景三:数据源和Streamr之间创建桥接(桥接点向数据源发送轮询请求)

  • 复杂度:中等
  • 延迟:中等
  • 可用性:好
    大多数云服务提供商都会为用户提供访问数据的API接口,这种场景正是使用该API接口完成用户数据上传至Streamr。在这种场景下,当数据源产生新的数据时,用户不会收到任何通知,这就意味着用户须要重复向该API接口发送数据请求,这种方式成为轮询。显而易见,这种方式并非一个获取实时数据的最佳方式,缘由以下:1. 在用户请求该数据以前,该数据可能已经变化屡次,意味着用户会丢失一些数据信息;2. 会给API服务器端带来没必要要的负载,由于用户请求的时机是不固定的,不必定恰巧在数据值发送变化的时候;3. 增长了时延(轮询周期的一半时间)。

示例1

本示例将介绍如何使用OpenWeatherMap每隔15分钟查询瑞士楚格的天气状况。对于天气等这种数据,使用轮询方式获取是可行的,由于天气情况不会变化的太快。json

const fetch = require('node-fetch')
const StreamrClient = require('streamr-client')
const OPENWEATHERMAP_API_KEY = 'MY-OPENWEATHERMAP-KEY'
const STREAMR_API_KEY = 'MY-STREAMR-KEY'
const POLL_INTERVAL = 15 * 60 * 1000 // 5 minutes
const location = 'Zug,Switzerland'
const client = new StreamrClient({
    apiKey: STREAMR_API_KEY
})
// Query data from OWM and produce the result to Streamr
function pollAndProduce(stream) {
    fetch(`https://api.openweathermap.org/data/2.5/weather?q=${location}&APPID=${OPENWEATHERMAP_API_KEY}&units=metric`)
        .then((response) => response.json())
        .then((json) => {
            console.log(json)
            // Produce the data point to Streamr
            return stream.produce(json)
        }).catch((err) => {
            console.error(err)
        })
}
// Create a Stream for this location, if it doesn't exist
client.getOrCreateStream({
    name: `Weather in ${location}`,
    description: 'From openweathermap.org, updated every 15 minutes'
}).then((stream) => {
    console.log(`Target Stream id: ${stream.id}`)
    // Poll and produce now
    pollAndProduce(stream)
    // Keep repeating every 15 minutes
    setInterval(() => pollAndProduce(stream), POLL_INTERVAL)
}).catch((err) => {
    console.log(err)
})

示例2

示例2将介绍如何利用可视化编辑器完成上述功能。
canvas

图中,Clock模型每隔15分钟发出一次警报而且触发HTTP Request模型向OpenWeatherMap发出获取天气数据的请求。而后,SendToStream模型将获取到的数据上传至Streamr。api

参考文献

  1. https://medium.com/streamrblog/three-patterns-for-integrating-your-data-to-streamr-2-of-3-38027dc91f9e
相关文章
相关标签/搜索