云中树莓派(1):环境准备html
云中树莓派(2):将传感器数据上传到AWS IoT 并利用Kibana进行展现java
云中树莓派(3):经过 AWS IoT 控制树莓派上的Lednode
云中树莓派(4):利用声音传感器控制Led灯python
云中树莓派(5):利用 AWS IoT Greengrass 进行 IoT 边缘计算git
IoT 的诸多场景中,边缘计算有不少需求。好比,不是每一个物联网设备都能链接到互联网,从而链接云上物联网服务。还好比有一些数据安全考虑,不容许将某些数据发到云上。所以,AWS 发布了 Greengrass 服务,用于支持物联网场景中的边缘计算。 github
架构:json
关于架构的部分说明:ubuntu
我在本地建立了一台 ubuntu 16.04 虚机,用于 Greengrass Core 部署。根据 Greengrass 文档作一些操做系统配置,具体参见Greengrass文档。在配置完成后,可运行检查工具来验证环境是否可用:安全
cd /home/pi/Downloads git clone https://github.com/aws-samples/aws-greengrass-samples.git cd aws-greengrass-samples cd greengrass-dependency-checker-GGCv1.5.0 sudo modprobe configs sudo ./check_ggc_dependencies | more
遇到两个小问题,提示未发现 java8 和 nodejs610。此时,须要建立三个软连接:服务器
ln -s /usr/bin/node /usr/bin/nodejs6.10 ln -s /usr/bin/node /usr/bin/nodejs ln -s /usr/bin/java /usr/bin/java8
目前全球只有5个region 提供了 Greengrass 服务。在选择使用哪一个region时候,必定要注意本地到这个region的网络状况。一开始,我想固然地认为国内到亚洲好比东京或者悉尼由于地理距离较近所以网络会较好,但实际上却发现到美国弗吉尼亚的网络比到东京的网络要好得多。
(1)建立 Greengrass Group
建立后,需下载两个压缩包:
一个 Greengrass Group 包含的资源以下图所示,具体有:
(2)在设备上启动 Greengrass Core
{ "coreThing": { "caPath": "root.ca.pem", "certPath": "022829d5c4.cert.pem", "keyPath": "022829d5c4.private.key", "thingArn": "arn:aws:iot:us-east-1:*******:thing/homepi_Core", "iotHost": "*********.iot.us-east-1.amazonaws.com", "ggHost": "greengrass.iot.us-east-1.amazonaws.com", "keepAlive": 2000 }, "runtime": { "cgroup": { "useSystemd": "yes" } }, "managedRespawn": false }
(3)问题排查
能够在 /greengrass/ggc/var/log/system 中查看 Greengrass Core 的日志文件。若是有错误,则定向排查。
运行在 GGC 中的 Lambda 函数须要把 Greengrass SDK 打包进去。它的SDK 中提供了 HelloWorld 示例函数代码。函数代码以下,很简单,它每隔5秒钟向 hello/world MQTT 主题发送『Hello World』消息。
import greengrasssdk import platform from threading import Timer import time # Creating a greengrass core sdk client client = greengrasssdk.client('iot-data') # Retrieving platform information to send from Greengrass Core my_platform = platform.platform() def greengrass_hello_world_run(): if not my_platform: client.publish(topic='hello/world', payload='Hello world! Sent from Greengrass Core.') else: client.publish(topic='hello/world', payload='Hello world! Sent from Greengrass Core running on platform: {}'.format(my_platform)) # Asynchronously schedule this function to be run again in 5 seconds Timer(5, greengrass_hello_world_run).start() # Start executing the function above greengrass_hello_world_run() def function_handler(event, context): return
参考GG文档,完成所需步骤后,完成该函数的建立。发布它的的一个版本,并建立别名 GG_HelloWorld。
在 Greengrass 服务中添加上面建立的函数:
订阅表用于定义 Greengrass 组内 (AWS Greengrass 核心设备、AWS IoT 设备和 Lambda 函数之间) 如何交换消息。订阅表中的每一个条目指定源、目标和发送/接收消息时使用的 MQTT 主题。仅当订阅表中存在指定源 (消息发件人)、目标 (消息收件人) 和 MQTT 主题的条目时才能交换消息。订阅表条目指定从源到目标的单向消息传递。若是您须要双向消息传递,请建立两个订阅表条目,每一个条目针对一个方向。
为了测试该函数是否按设计发出了消息,建立一个从该函数到 IoT Service 的订阅,这样从 IoT 服务上就能够收到它发出的消息了。
云上的全部操做都必须经过『部署』应用到 Greengrass Core 上。所以,对 Greengroup 作了任何变化后,都必须经过部署操做将其应用到Core 上。
点击 Actions -> Deploy,开始部署。能够从 Core 的 runtime.log 文件中看到其大体过程:
[2018-08-13T15:56:55.622+08:00][INFO]-Received deploymentId 613dc2ec-8877-41b2-a217-8f939a7782fc of type NewDeployment for group f7a0bc8f-4527-481d-9301-1545b86fcf68 [2018-08-13T15:56:55.622+08:00][INFO]-Updating status to InProgress of deployment 613dc2ec-8877-41b2-a217-8f939a7782fc for group f7a0bc8f-4527-481d-9301-1545b86fcf68 [2018-08-13T15:56:56.611+08:00][INFO]-Executing the group Downloading step
而后查询其状态:
在界面上的Test 功能中,能够收到 Lambda 函数发出的消息:
在 Lambda 函数被部署到 Core 上以后,在 Core 上起了一个新的进程:
ggc_user 21106 0.3 0.7 189616 15148 ? Ssl 16:33 0:12 python2.7 -u /runtime/python2.7/lambda_runtime.py --handler=greengrassHelloWorld.function_handler
该进程利用了 cgroup 来限制资源:
root@greengrass:/home/ubuntu# cat /proc/21106/cgroup 11:freezer:/system.slice/83db5d89-d650-4f65-542a-f6669153d4e6 10:blkio:/user.slice/system.slice/83db5d89-d650-4f65-542a-f6669153d4e6 9:memory:/user.slice/system.slice/83db5d89-d650-4f65-542a-f6669153d4e6 8:net_cls,net_prio:/system.slice/83db5d89-d650-4f65-542a-f6669153d4e6 7:pids:/user.slice/user-1000.slice/system.slice/83db5d89-d650-4f65-542a-f6669153d4e6 6:hugetlb:/system.slice/83db5d89-d650-4f65-542a-f6669153d4e6 5:perf_event:/system.slice/83db5d89-d650-4f65-542a-f6669153d4e6 4:devices:/user.slice/system.slice/83db5d89-d650-4f65-542a-f6669153d4e6 3:cpuset:/system.slice/83db5d89-d650-4f65-542a-f6669153d4e6 2:cpu,cpuacct:/user.slice/system.slice/83db5d89-d650-4f65-542a-f6669153d4e6 1:name=systemd:/user.slice/user-1000.slice/session-63.scope/system.slice/83db5d89-d650-4f65-542a-f6669153d4e6
目前只支持指定函数的内存限制:
示意图:
(1)在 IoT Greengrass 服务中建立两个设备,分别是 HelloWorld_Publisher (模拟上图中的设备 #1)和 HelloWorld_Subscriber(模拟上图中的设备 #2)。将获取到各自的证书文件。
建立设备:
建立结果:
(2)配置订阅,从 Publisher 到 Subscriber:
(3)经过 部署,把应用同步到 Greengrass Core 上。
以树莓派为平台,在上面运行两个程序,来模拟上面的两个物联网设备。
(1)首先须要在树莓派上安装 AWS IoT Device SDK
git clone https://github.com/aws/aws-iot-device-sdk-python.git cd aws-iot-device-sdk-python python setup.py install
(2)SDK 中有个示例文件 /aws-iot-device-sdk-python/samples/greengrass/basicDiscovery.py 可用于本测试
(3)运行脚本模拟 publlisher:
python basicDiscovery.py -e *****.iot.us-east-1.amazonaws.com -r pubcerts/root-ca.pem -c pubcerts/3ed88f606a.cert.pem -k pubcerts/3ed88f606a.private.key -n HelloWorld_Publisher -m publish -t hello/world/pubsub -M "Hellow, I am Publisher"
它会不停地向 hello/world/pubsub 发送消息:
2018-08-14 16:44:14,143 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [puback] event 2018-08-14 16:44:14,145 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [puback] event 2018-08-14 16:44:15,144 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish... Published topic hello/world/pubsub: {"message": "Hellow, I am Publisher", "sequence"
(4)运行另外一个脚本模拟 subscriber:
python basicDiscovery.py -e *******.iot.us-east-1.amazonaws.com -r subcerts/root-ca.pem -k subcerts/7d8fefa9d3.private.key -c subcerts/7d8fefa9d3.cert.pem -n HelloWorld_Subscriber -t hello/world/pubsub -m subscribe
它会不断收到消息:
2018-08-14 16:44:15,194 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [message] event 2018-08-14 16:44:15,196 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [message] event 2018-08-14 16:44:15,197 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback... Received message on topic hello/world/pubsub: {"message": "Hellow, I am Publisher", "sequence": 6}
(1)订阅者一开始,会向 IoT Service Endpoint 发送一个 Discovery 消息:
Sending discover request: GET /greengrass/discover/thing/HelloWorld_Subscriber HTTP/1.1
Host: a1upjpa864lewg.iot.us-east-1.amazonaws.com:8443
说明:这里说明边缘的物联网设备仍是须要链接到云上的IoT端点,这说明它们仍然须要互联网访问能力。
(2)它收到返回消息
Receiving discover response body... Discovered GGC: arn:aws:iot:us-east-1:*******:thing/homepi_Core from Group: 669d91fc-0690-48ab-a36d-90816b2332b4 Now we persist the connectivity/identity information...
(3)它链接到 Greengrass Core
Trying to connect to core at 192.168.1.12:8883
(4)它订阅到指定 topic
Adding a new subscription record: hello/world/pubsub qos: 0
(5)它开始接收消息
2018-08-14 16:44:09,381 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Produced [message] event 2018-08-14 16:44:09,382 - AWSIoTPythonSDK.core.protocol.internal.workers - DEBUG - Dispatching [message] event 2018-08-14 16:44:09,384 - AWSIoTPythonSDK.core.protocol.internal.clients - DEBUG - Invoking custom event callback... Received message on topic hello/world/pubsub: {"message": "Hellow, I am Publisher", "sequence": 0}
可见这过程里面,处于边缘的物联网设备仍是须要链接到云上IoT 服务一次,去获取Core 的信息。Core 的 Connectivity 信息能够收入输入,也能够由Core 自动推送到云上。
示意图:
(1)在IoT 服务中,在 Greengrass 组内,建立两个设备,GG_Switch 和 GG_TrafficLight。
(2)建立订阅
(3)部署
从 https://github.com/aws-samples/aws-greengrass-samples/tree/master/traffic-light-example-python 下载 lightController.py 和 trafficLight.py 文件。前者模拟一个Led 灯的控制器,后者模拟Led 灯。
(1)运行Controller
python lightController.py -e ****.iot.us-east-1.amazonaws.com -r switchcerts/root-ca.pem -c switchcerts/8bb0278c01.cert.pem -k switchcerts/8bb0278c01.private.key -n GG_TrafficLight --clientId GG_Switch
它会定时向设备影子发出更新请求:
{"state":{"desired":{"property":"Y"}}} 2018-08-14 17:00:28,915 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish... ~~~~~~~~~~Shadow Update Accepted~~~~~~~~~~~~~ Update request with token: 1827378e-9b0b-4b03-a7df-2c1af119510f accepted! property: Y
(2)运行 Light
python trafficLight.py -e ****.iot.us-east-1.amazonaws.com -r lightcerts/root-ca.pem -c lightcerts/eae63a2ee2.cert.pem -k lightcerts/eae63a2ee2.private.key -n GG_TrafficLight --clientId GG_TrafficLight
它会收到 Delta 请求,变动Led 的状态:
Light changed to: Y {"state":{"reported":{"property":"Y"}}} 2018-08-14 17:02:29,111 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync subscribe... 2018-08-14 17:02:29,120 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync subscribe... 2018-08-14 17:02:31,132 - AWSIoTPythonSDK.core.shadow.deviceShadow - INFO - Subscribed to update accepted/rejected topics for deviceShadow: GG_TrafficLight 2018-08-14 17:02:31,133 - AWSIoTPythonSDK.core.protocol.mqtt_core - INFO - Performing sync publish... ~~~~~~~~~~ Shadow Update Accepted ~~~~~~~~~~~~~ Update request with token: ec5f0cdb-0558-44b7-a685-02d2df8a31cb accepted! property: Y
示意图:
(1)建立 IAM Role Greengrass_DynamoDB_Role,将其赋予给 Greengrass,用于访问 DynamoDB。
(2)建立 IAM Role Lambda_DynamoDB_Role,它会被赋予给 Lambda 函数,用于访问 DynamoDB。
(3)从 https://github.com/aws-samples/aws-greengrass-samples/tree/master/traffic-light-example-python 下载 carAggregator.py,打包成 Lambda 函数包,建立 Lambda 函数。函数名为 GG_Car_Aggregator。看下它的代码:
import logging import boto3 from datetime import datetime from random import * from botocore.exceptions import ClientError dynamodb = boto3.resource('dynamodb', region_name='us-east-1') tableName = "CarStats" # Create the dynamo db table if needed try: table = dynamodb.create_table( TableName=tableName, KeySchema=[ { 'AttributeName': 'Time', 'KeyType': 'HASH' #Partition key } ], AttributeDefinitions=[ { 'AttributeName': 'Time', 'AttributeType': 'S' } ], ProvisionedThroughput={ 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 } ) # Wait until the table exists. table.meta.client.get_waiter('table_exists').wait(TableName=tableName) except ClientError as e: if e.response['Error']['Code'] == 'ResourceInUseException': print("Table already created") else: raise e # initialize the logger logger = logging.getLogger() logger.setLevel(logging.INFO) # This is a long lived lambda so we can keep state as below totalTraffic = 0 totalGreenlights = 0 minCars = -1 maxCars = -1 def function_handler(event, context): global totalTraffic global totalGreenlights global minCars global maxCars # grab the light status from the event # Shadow JSON schema: # { "state": { "desired": { "property":<R,G,Y> } } } logger.info(event) lightValue = event["current"]["state"]["reported"]["property"] logger.info("reported light state: " + lightValue) if lightValue == 'G': logger.info("Green light") # generate a random number of cars passing during this green light cars = randint(1, 20) # update stats totalTraffic += cars totalGreenlights+=1 if cars < minCars or minCars == -1: minCars = cars if cars > maxCars: maxCars = cars logger.info("Cars passed during green light: " + str(cars)) logger.info("Total Traffic: " + str(totalTraffic)) logger.info("Total Greenlights: " + str(totalGreenlights)) logger.info("Minimum Cars passing: " + str(minCars)) logger.info("Maximum Cars passing: " + str(maxCars)) # update car stats to dynamodb every 3 green lights if totalGreenlights % 3 == 0: global tableName table = dynamodb.Table(tableName) table.put_item( Item={ 'Time':str(datetime.utcnow()), 'TotalTraffic':totalTraffic, 'TotalGreenlights':totalGreenlights, 'MinCarsPassing':minCars, 'MaxCarsPassing':maxCars, } ) return
代码也很简单。它首先会尝试建立一个 Dynamo table。而后在每次收到 documents 后,检查 reported 状态。若是为 「G」,表示为绿灯,它会向Dynamo 表中写入一条数据。
(4)将该函数添加到 Greengrass 组中。
(5)配置订阅。本地影子服务会将设备的 documents 发给 Aggregator Lambda 函数。
保持 4.2 中的 Controller 和 Light 持续运行。几分钟后,Dynamo 中将会有数据产生:
感受AWS IoT Greengrass 服务还有一些不太完善,主要有如下几个缘由:
参考连接:
欢迎你们关注个人我的公众号: