全部刊发内容都可转载可是须要注明出处。
html
本系列文档主要介绍怎么经过Streamr管理本身的DATA,整个系列包括三篇教程文档,分别是:教你5分钟上传数据至Streamr、三种整合数据至Streamr的典型场景、教你在Streamr市场上发布数据。全部文档均参考Streamr blog。前两篇主要偏向技术文档,因此须要有必定的技术背景。第三篇不包含任何技术知识,大部分人均可以按照教程来完成相应的操做。node
第一篇文档主要介绍如何经过调用API接口上传数据至Streamr,本篇教程文档主要介绍了三种整合数据至Streamr网络的典型示例。包括: 1. 直接从数据源推送数据至Streamr网络; 2. 数据源和Streamr之间创建桥接(数据源向桥接点实时推送数据); 3. 数据源和Streamr之间创建桥接(桥接点向数据源发送轮询请求)。git
在Streamr网络中,每一个数据点属于一个Stream。每一个数据点(或者成为事件、消息)包含了标有时间戳的一系列信息,好比,传感器采集的数据、即时通信的消息等。根据使用场景的不一样,一个Stream能够包括一个源的数据或者是多个数据源的数据。Streamr网络为用户上传数据提供API接口,用户调用该API接口最简单的方式是使用Streamr客户端库。当前的Streamr客户端库是JavaScript编写的,其余语言的客户端库还在开发中。若是您不方便使用当前的Streamr客户端库,您也可使用HTTP库来调用API接口,相关方法请参见上一篇说明文档。程序员
const StreamrClient = require('streamr-client') const ruuvi = require('node-ruuvitag') const API_KEY = 'MY-API-KEY' const client = new StreamrClient({ apiKey: API_KEY }) const streams = {} console.log('Listening for RuuviTags...') ruuvi.on('found', tag => { // Create a Stream for this tag if it doesn't exist yet if (!streams[tag.id]) { streams[tag.id] = await client.getOrCreateStream({ name: 'Ruuvi ' + tag.id }) } tag.on('updated', async (data) => { const stream = streams[tag.id] try { // Produce the data point to the stream await stream.produce(data) } catch (err) { console.error(err) } }) })
示例1主要介绍如何使用Bitfinex客户端库和Streamr客户端库,将Bitfinex上DATA/USD交易数据和Streamr创建链接。github
const BFX = require('bitfinex-api-node') const StreamrClient = require('streamr-client') const STREAM_ID = 'MY-STREAM-ID' const API_KEY = 'MY-API-KEY' const bws = new BFX('', '', { version: 2, transform: true }).ws const client = new StreamrClient({ apiKey: API_KEY }) bws.on('open', () => { bws.subscribeTicker('DATUSD') }) bws.on('ticker', (pair, ticker) => { console.log('Ticker:', ticker) client.produceToStream(STREAM_ID, ticker) .then(() => console.log('Sent to Streamr!')) .catch((err) => console.error(err)) })
示例1主要从编程角度介绍如何创建相应的链接,若是您非程序员,那可使用本示例中的可视化编辑器Streamr Editor来创建相应的链接。可是,使用该可视化编辑器也须要用户了解一些API接口和协议等技术知识。下面展现一个经过MQTT API监测电车实时位置的过程,该过程已得到赫尔辛基公共交通的受权。编程
# MQTT 模型参数 URL: mqtt://mqtt.hsl.fi Topic: /hfp/v1/journey/ongoing/tram/# MQTT 模型数据JSON格式的字符串,经过JSON解析器模型解析为一个对象。从该对象中提取出'VP'字段,该字段包含的也是一个对象。从提取出的对象中获取'desi','lat','long'和'veh'字段的值,而后通过数据处理以后经过SendToStream模型将该数据上传至Streamr。
本示例将介绍如何使用OpenWeatherMap每隔15分钟查询瑞士楚格的天气状况。对于天气等这种数据,使用轮询方式获取是可行的,由于天气情况不会变化的太快。json
const fetch = require('node-fetch') const StreamrClient = require('streamr-client') const OPENWEATHERMAP_API_KEY = 'MY-OPENWEATHERMAP-KEY' const STREAMR_API_KEY = 'MY-STREAMR-KEY' const POLL_INTERVAL = 15 * 60 * 1000 // 5 minutes const location = 'Zug,Switzerland' const client = new StreamrClient({ apiKey: STREAMR_API_KEY }) // Query data from OWM and produce the result to Streamr function pollAndProduce(stream) { fetch(`https://api.openweathermap.org/data/2.5/weather?q=${location}&APPID=${OPENWEATHERMAP_API_KEY}&units=metric`) .then((response) => response.json()) .then((json) => { console.log(json) // Produce the data point to Streamr return stream.produce(json) }).catch((err) => { console.error(err) }) } // Create a Stream for this location, if it doesn't exist client.getOrCreateStream({ name: `Weather in ${location}`, description: 'From openweathermap.org, updated every 15 minutes' }).then((stream) => { console.log(`Target Stream id: ${stream.id}`) // Poll and produce now pollAndProduce(stream) // Keep repeating every 15 minutes setInterval(() => pollAndProduce(stream), POLL_INTERVAL) }).catch((err) => { console.log(err) })
示例2将介绍如何利用可视化编辑器完成上述功能。
canvas
图中,Clock模型每隔15分钟发出一次警报而且触发HTTP Request模型向OpenWeatherMap发出获取天气数据的请求。而后,SendToStream模型将获取到的数据上传至Streamr。api