AWS Step Functions 将多个 AWS 服务协调为无服务器工做流,以即可以快速构建和更新应用程序。使用 Step Functions,能够设计和运行将AWS Lambda 和 Amazon ECS 等服务整合到功能丰富的应用程序中的工做流。工做流由一系列步骤组成,一个步骤的输出充当下一个步骤的输入。 使用 Step Functions,应用程序开发更简单、更直观,由于它将工做流转换为易于理解、易于向其余人说明且易于更改的状态机示意图。能够监控执行的每一个步骤,这意味着能够快速识别并解决问题。Step Functions 能够自动触发和跟踪各个步骤,并在出现错误时重试,所以的应用程序可以按照预期顺序执行。数据库
demo架构图json
AWS step function 状态机的工做流程服务器
工做流程如图:session
1.调用 Input Lottery Winners 函数,传入 num_of_winners,进入第二步。架构
2.Random Select Winners 根据 Input Lottery Winners 的输出(body)调用 Random Select Winners,生成两个获奖号码,进入第三步。运维
3.Validate Winners 根据第二步输出查询 Winners 表判断是否重复,重复则传出 status:1,不然传出 status:0。dom
4.Is Winner In Past Draw 接受第三步 status 并判断,当 status 为1,则从新调用 Random Select Winners 进入第二步,当 status 为0,则调用 Notify Winners 和 Record Winner Queue,给 SNS topic 发送通知,把获奖者写入 Winner 表。ide
在整个工做流程中,当 Catch 接收到错误,都直接进入 Failed 步骤,输出异常并中断 step function。函数
IAM角色建立设计
执行 lambda 的角色须要如下策略:
AmazonDynamoDBFullAccess
AWSLambdaBasicExecutionRole
AmazonSNSFullAccess
AWSStepFunctionsFullAccess
在 AWS IAM 控制台中建立角色:
大概以下图
lambda 建立
Input Lottery Winners
为了实现Step Functions状态机流转下的任务,咱们此次实现会用到AWS Lambda做为咱们业务的实现环境
1.进入AWS控制台,选择服务而后输入Lambda进入AWS Lambda控制台
2.选择建立函数,而后选择从头开始创做来自定义咱们的实验程序
3.首先咱们须要建立状态机中的第一个状态任务Input Lottery Winners,输入函数名称Lottery-InputWinners来定义幸运儿的数量。运行语言选择Python 3.7。同时须要选择函数执行的权限, 这里咱们选择使用现用角色,选择咱们以前建立的IAM角色
4.点击建立函数
5.在函数代码栏目下输入以下代码块,修改代码中的 region_name 为当前使用的 region
6.建立函数时复制页面右上角的ARN,后面建立状态机须要
import json
class CustomError(Exception):
pass
def lambda_handler(event, context):
num_of_winners = event['input']
if 'exception' in event: raise CustomError("An error occurred!!") return { "body": { "num_of_winners": num_of_winners } }
接下来咱们还须要建立另外三个须要定义的状态机业务逻辑,建立过程和上面的Lottery-InputWinners一致,下面是具体的状态机AWS
Lambda代码块
Lottery-RandomSelectWinners
import json
import boto3
from random import randint
from boto3.dynamodb.conditions import Key, Attr
TOTAL_NUM = 10
def lambda_handler(event, context):
num_of_winners = event['num_of_winners'] # query in dynamodb dynamodb = boto3.resource('dynamodb', region_name='') table = dynamodb.Table('Lottery-Employee') # random select the winners, if has duplicate value, re-run the process while True: lottery_serials = [randint(1,TOTAL_NUM) for i in range(num_of_winners)] if len(lottery_serials) == len(set(lottery_serials)): break # retrieve the employee details from dynamodb results = [table.query(KeyConditionExpression=Key('lottery_serial').eq(serial), IndexName='lottery_serial-index') for serial in lottery_serials] # format results winner_details = [result['Items'][0] for result in results] return { "body": { "num_of_winners": num_of_winners, "winner_details": winner_details } }
Lottery-ValidateWinners
import json
import boto3
from boto3.dynamodb.conditions import Key, Attr
def lambda_handler(event, context):
num_of_winners = event['num_of_winners'] winner_details = event['winner_details'] # query in dynamodb dynamodb = boto3.resource('dynamodb', region_name='') table = dynamodb.Table('Lottery-Winners') # valiate whether the winner has already been selected in the past draw winners_employee_id = [winner['employee_id'] for winner in winner_details] results = [table.query(KeyConditionExpression=Key('employee_id').eq(employee_id)) for employee_id in winners_employee_id] output = [result['Items'] for result in results if result['Count'] > 0] # if winner is in the past draw, return 0 else return 1 has_winner_in_queue = 1 if len(output) > 0 else 0 # format the winner details in sns winner_names = [winner['employee_name'] for winner in winner_details] name_s = "" for name in winner_names: name_s += name name_s += " " return { "body": { "num_of_winners": num_of_winners, "winner_details": winner_details }, "status": has_winner_in_queue, "sns": "Congrats! [{}] You have selected as the Lucky Champ!".format(name_s.strip()) }
Lottery-RecordWinners
import json
import boto3
from boto3.dynamodb.conditions import Key, Attr
def lambda_handler(event, context):
winner_details = event['winner_details'] # retrieve the winners' employee id employee_ids = [winner['employee_id'] for winner in winner_details] # save the records in dynamodb dynamodb = boto3.resource('dynamodb', region_name='') table = dynamodb.Table('Lottery-Winners') for employee_id in employee_ids: table.put_item(Item={ 'employee_id': employee_id }) return { "body": { "winners": winner_details }, "status_code": "SUCCESS" }
建立AWS SNS 通知服务
1.进入AWS控制台,在服务中搜索SNS
2.在SNS控制面板中,选择主题, 而后选择建立主题
3.在建立新主题弹框中,输入
主题名称: Lottery-Notification
显示名称: Lottery
1.建立主题后,会进入主题详细信息页面,这时候咱们须要建立订阅来对接咱们的消息服务,例如邮件服务(本次实验使用邮件服务来做为消息服务)
2.点击建立订阅, 在弹框中选择
协议: Email
终端节点: <填入本身的邮箱地址>
1.点击请求确认, 而后到上面填写的邮箱地址中确认收到信息,表示确认该邮箱能够接收来自AWS SNS该主题的通知消息
2.复制主题详细页面的主题ARN,以后替换Step Functions状态机下的<Notification:ARN>
建立Amazon Dynamodb 服务
本次实验须要建立两张Dynamodb表来记录员工信息和幸运儿信息。使用Dynamodb能更快地经过托管的方式记录数据同时免去数据库运维的压力。
1.进入AWS控制台,在服务中搜索Dynamodb
2.在左侧控制栏中选在表, 而后在主页面中选择建立表
3.在建立Dynamodb表中,填入以下信息
·表名称:Lottery-Winners ·主键:employee_id
4.表设置中确认勾选使用默认设置,点击建立
5.一样的设置步骤,点击建立表,在建立Dynamodb表中,填入以下信息
·表名称:Lottery-Employee主键:employee_id
1.表设置中确认勾选使用默认设置,点击建立
2.等待表建立完成后, 经过本附件中的request-items.json文件导入数据到Lottery-Employee
$ aws dynamodb batch-write-item --request-items file://request-items.json
1.选择表Lottery-Employee Tab页面中的索引, 点击建立索引
主键:lottery_serial, 字段类型选择数字
索引名称:lottery_serial-index
建立AWS Step Functions 状态机
1.进入AWS控制台,在服务中搜索Step Functions
2.进入Step Functions服务后,点击左侧的活动栏,并点击状态机
3.进入状态机主页面后,选择建立状态机
4.在定义状态机栏目下,选择默认使用代码段创做。同时在详细信息栏目输入状态机名称Lottery
5.在状态机定义栏目下,复制以下状态机定义文件,经过Amazon States Language来定义状态机的状态流转
{
"Comment": "A simple AWS Step Functions state machine that simulates the lottery session",
"StartAt": "Input Lottery Winners",
"States": {
"Input Lottery Winners": { "Type": "Task", "Resource": "<InputWinners:ARN>", "ResultPath": "$", "Catch": [ { "ErrorEquals": [ "CustomError" ], "Next": "Failed" }, { "ErrorEquals": [ "States.ALL" ], "Next": "Failed" } ], "Next": "Random Select Winners" }, "Random Select Winners": { "Type": "Task", "InputPath": "$.body", "Resource": "<RandomSelectWinners:ARN>", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "Failed" } ], "Retry": [ { "ErrorEquals": [ "States.ALL"], "IntervalSeconds": 1, "MaxAttempts": 2 } ], "Next": "Validate Winners" }, "Validate Winners": { "Type": "Task", "InputPath": "$.body", "Resource": "<ValidateWinners:ARN>", "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "Failed" } ], "Retry": [ { "ErrorEquals": [ "States.ALL"], "IntervalSeconds": 1, "MaxAttempts": 2 } ], "Next": "Is Winner In Past Draw" }, "Is Winner In Past Draw": { "Type" : "Choice", "Choices": [ { "Variable": "$.status", "NumericEquals": 0, "Next": "Send SNS and Record In Dynamodb" }, { "Variable": "$.status", "NumericEquals": 1, "Next": "Random Select Winners" } ] }, "Send SNS and Record In Dynamodb": { "Type": "Parallel", "End": true, "Catch": [ { "ErrorEquals": [ "States.ALL" ], "Next": "Failed" } ], "Retry": [ { "ErrorEquals": [ "States.ALL"], "IntervalSeconds": 1, "MaxAttempts": 2 } ], "Branches": [ { "StartAt": "Notify Winners", "States": { "Notify Winners": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "<Notification:ARN>", "Message.$": "$.sns" }, "End": true } } }, { "StartAt": "Record Winner Queue", "States": { "Record Winner Queue": { "Type": "Task", "InputPath": "$.body", "Resource": "<RecordWinners:ARN>", "TimeoutSeconds": 300, "End": true } } } ] }, "Failed": { "Type": "Fail" }
}
}
1.在状态机定义栏目的右侧,点击刷新按钮,能够看到状态机流转的流程图。使用以前的 lambda ARN(4个),SNS topic ARN(1个),对应替换状态机 json 文件中的<InputWinners:ARN>,<RandomSelectWinners:ARN>,<ValidateWinners:ARN>,<RecordWinners:ARN>,<Notification:ARN>,点击下一步。
2.在配置设置下,选择为我建立IAM角色, 输入自定义的IAM角色名称MyStepFunctionsExecutionRole,而且附加AmazonSNSFullAccess权限
3.点击建立状态机完成建立过程
2.进入以前建立的状态机Lottery
3.点击启动执行
4.在弹框中填入输入的json文本,这里的input表明在本次实验中须要抽取的获奖人数
{
"input": 2
}
1.点击启动执行
2.邮件会收取到幸运儿的信息
当函数出错时,也能直观得在可视化界面看到,而且在输出中查看错误日志