Kubeflow是一款流行的开源机器学习(ML)工具包,适用于但愿构建自定义机器学习管道的Kubernetes用户。Kubeflow Pipelines属于Kubeflow提供的附加组件,可帮助咱们构建并部署具有良好可移植性与可扩展性的端到端机器学习工做流。但在使用Kubeflow Pipelines时,数据科学家每每须要耗费很多精力以实现各种生产力工具,例如数据标记工做流与模型调优工具。html
另外,在使用Kubeflow Pipelines的过程当中,MLOps团队还须要管理Kubernetes集群中的CPU与GPU实例,考虑如何保持资源的高利用率,最终实现最佳投资回报。在数据科学团队当中,充分利用集群资源是一项极为艰巨的挑战,同时也极大增长了MLOps团队的运营压力。例如,咱们须要保证仅使用GPU实例处理深度学习训练及推理等性能要求较高的任务,并将CPU实例用于执行数据预处理以及Kubeflow管道控制平面等性能要求相对较低的任务。python
最近,AWS正式公布了适用于Kubeflow Pipelines的Amazon SageMaker Components。借此,Amazon SageMaker Components可以帮助咱们在Kubeflow Pipelines上实现强大的Amazon SageMaker功能,以全托管服务的形式支持数据标记、大规模超参数调优以及分布式训练做业、一键式安全可扩展模型部署,并经过Amazon Elastic Compute Cloud (Amazon EC2)竞价实例进行高成本效益训练等目标。git
下文将介绍如何经过Kubeflow Pipelines SDK,使用Amazon SageMaker Components构建出一个Kubeflow管道。github
咱们能够在Kubeflow Pipelines中使用SageMaker Components,在机器学习工做流中的各个阶段调用相应SageMaker做业,且无需担忧其在后台的具体运行方式。做为数据科学家或机器学习开发人员,咱们能够借此将精力集中在经过可移植、可扩展管道内构建及运行机器学习实验身上。所谓管道组件,是指可以在容器化应用程序中实现复用的代码,你们能够根据需求进行代码共享以提升生产效率。算法
若是但愿使用Amazon SageMaker中的超参考调优功能,则可使用超参数优化组件。在管道做业的运行过程当中,相应的超参数调优步骤将在Amazon SageMaker全托管基础设施之上进行。在下一节中,咱们将具体介绍其工做方式;本节先来介绍Amazon SageMaker Components的基本原理。docker
在一条典型的Kubeflow管道中,各个组件负责将您的逻辑封装在容器镜像当中。做为开发人员或数据科学家,咱们须要将训练、数据预处理、模型服务或者其余逻辑打包在Kubeflow Pipelines ContainerOp函数中,由该函数将代码构建至新容器内。或者也可将代码放置在自定义容器镜像中,然后将镜像推送至容器注册表(如Amazon Elastic Container Registry,简称Amazon ECR)。在管道运行时,负责运行Kubeflow的Kubernetes集群将指定一个工做节点对组件容器进行实例化,进而执行相关逻辑。管道组件可读取先前组件的输出,并生成管道中下一组件可使用的输出。下图所示,为此管道工做流的基本架构。
在Kubeflow Pipelines中使用Amazon SageMaker Components时,你们再也不须要将逻辑封装在自定义容器以内,而能够直接加载对应组件并使用Kubeflow Pipelines SDK描述管道。在管道运行时,其中的指令将被转换为Amazon SageMaker做业或部署。然后,工做负载将在Amazon SageMaker全托管基础设施之上运行,保证咱们充分享受Amazon SageMaker的各项典型优点,包括Managed Spot Training、端点自动规模伸缩等。下图为这类加强管道的基本架构。json
随着时间推移,AWS将继续推出更多功能,建议你们在官方Kubeflow Pipelines GitHub repo中收收藏Amazon SageMaker Components目录连接。 )ubuntu
为了说明如何在Kubeflow中使用Amazon SageMaker Components,让咱们按下图所示构建一条小型管道。
实际场景中的管道每每更为高级,并且涉及数据摄取、数据预处理、数据转换等其余步骤。但为简便起见,这里只使用最简单的管道设计。安全
你们能够在YouTube上观看本演练的视频版本:《在Kubernetes与Kubeflow上使用SageMaker实现机器学习规模扩展》(Scaling Machine Learning on Kubernetes and Kubeflow with SageMaker)。服务器
下面来看管道中各个步骤的具体做用:
组件1:超参数调优做业;
第一个组件,负责运行Amazon SageMaker超参数调优做业以优化如下超参数:
输入:N/A
输出:最佳超参数
组件2:选择最佳超参数
在上一步的超参数搜索过程当中,模型只须要训练10个轮次便可肯定最佳性能超参数。而在第二步中,为了获取最佳超参数,咱们须要进行80轮更新,以保证最佳超参数可以进一步提高模型准确率。
输入:最佳超参数
输出:通过80轮更新的最佳超参数
组件3:使用最佳超参数执行训练做业
第三个组件将使用轮次最高的最佳超参数运行Amazon SageMaker训练做业。
输入:通过80轮更新的最佳超参数
输出:训练做业名称
组件4:建立一套待部署模型
第四个组件将建立一套Amazon SageMaker模型工件。
输入:训练做业名称
输出:模型工件名称
组件5:部署推理端点
最终组件将使用Amazon SageMaker完成模型部署。
输入:模型工件名称
输出:N/A
要运行如下用例,须要作好以下准备:
r_components_for_kubeflow_pipelines.html)。咱们须要如下两个IAM角色:
你们能够经过笔记本电脑、台式机、EC2实例或者Amazon SageMaker notebook实例启动Amazon EKS集群。不管如何选择,咱们一般将该实例称为“网关实例”。这是由于Amazon EKS提供彻底托管的控制平面,咱们只须要使用网关实例便可与Kubernetes API以及工做节点进行交互。在本示例中,咱们使用一个c5.xlarge EC2实例做为网关实例。
GitHub上提供了本文中使用的代码、配置文件、Jupyter notebook以及Dockerfile。咱们将经过如下演示逐步解释其中涉及的各项核心概念。请不要在各步骤中复制代码,建议你们直接运行GitHub上提供的现成Jupyter notebook。
打开终端并经过SSH接入咱们用于建立Amazon EKS集群的Amazon EC2网关实例。在登陆后,克隆该示例Repo以访问示例Jupyter notebook。详见如下代码:
cd
git clone https://github.com/shashankpr...
cd kubeflow-pipelines-sagemaker-examples
要在网关实例上打开Jupyter notebook,请完成如下操做步骤:
l 在网关实例上启动Jupyter,并使用如下代码经过本地设备访问Jupyter:
jupyter-lab
若是是在EC2实例上运行Jupyterlab服务器,请设置一条指向该EC2实例的通道,借此经过本地笔记本或者台式机访问Jupyterlab客户端。详见如下代码:
ssh -N -L 0.0.0.0:8081:localhost:8081 -L 0.0.0.0:8888:localhost:8888 -i ~/.ssh/<key_pair>.pem ubuntu@<IP_ADDRESS>
若是使用Amazon Linux而非Ubuntu,则必须使用ec2-user这一用户名。更新EC2实例的IP地址,并使用对应的密钥对。
至此,应该已经能够在本地设备上经过http://localhost:8888访问Jupyterlab了。
l 在网关实例上运行如下代码以访问Kubeflow仪表板:
kubectl port-forward svc/istio-ingressgateway -n istio-system 8081:80
至此,应该已经能够经过http://localhost:8081访问Kubeflow仪表板了。
l 打开示例Jupyter notebook。
Amazon SageMaker支持两种训练做业模式(GitHub Repo中分别为两种模式提供对应的Jupyter notebook):
Ø 自带Docker容器镜像 —— 在这种模式下,咱们能够提供本身的Docker容器以执行训练。使用训练脚本构建容器,然后将其推送至Amazon ECR容器注册表。Amazon SageMaker会提取该容器镜像,进行实例化,然后开始训练。kfp-sagemaker-custom-container.ipynb Jupyter notebook采用的便是这种方法。
Ø 自带训练脚本(即脚本模式)—— 在这种模式下,无需使用Docker容器。只要将机器学习训练脚本导入TensorFlow、PyTorch、MXNet或XGBoost等流行框架中,然后将框架上传至Amazon S3便可。Amazon SageMaker会自动提取适当容器、下载训练脚本然后加以运行。若是不想使用Docker容器,那么这种模式明显更为适用。kfp-sagemaker-script-mode.ipynb Jupyter notebook采用的就是这种方法。
如下示例主要探讨第一种方法(自带Docker容器镜像)。本演练将带你们了解kfp-sagemaker-custom-container.ipynb Jupyter notebook中的各个重要步骤。在将其打开后,咱们便可轻松完成后续操做。
如下截屏所示,即kfp-sagemaker-custom-container.ipynb notebook:
要安装SDK并加载各管道组件,请完成如下操做步骤:
l 使用如下表明安装 Kubeflow Pipelines SDK:
pip install kfp --upgrade
l 使用如下代码在Python中导入Kubeflow Pipelines软件包:
import kfp
from kfp import components
from kfp.components import func_to_container_op
from kfp import dsl
l 使用如下代码在Python中加载Kubeflow Pipelines Components:
sagemaker_hpo_op = components.load_component_from_url('https://raw.githubusercontent...')
sagemaker_train_op = components.load_component_from_url('https://raw.githubusercontent...')
sagemaker_model_op = components.load_component_from_url('https://raw.githubusercontent...')
sagemaker_deploy_op = components.load_component_from_url('https://raw.githubusercontent...')
在生产工做流中,建议将组件固定至特定提交操做当中,而非直接使用Master。这样能够保证任何可能致使兼容性破坏的变动都不会对生产流程形成影响。
例如,要使用特定的提交版本,请在各加载组件中使用提交哈希替换掉Master部分:
sagemaker_train_op = components.load_component_from_url(‘https://raw.githubusercontent...’)
要准备并上传数据集,请输入如下代码:
import sagemaker
import boto3
sess = boto3.Session()
account = boto3.client('sts').get_caller_identity().get('Account')
sm = sess.client('sagemaker')
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session(boto_session=sess)
bucket_name = sagemaker_session.default_bucket()
job_folder = 'jobs'
dataset_folder = 'datasets'
local_dataset = 'cifar10'
!python generate_cifar10_tfrecords.py —data-dir {local_dataset}
datasets = sagemaker_session.upload_data(path='cifar10', key_prefix='datasets/cifar10-dataset')
在以上代码中,咱们首先导入Sagemaker与Boto3软件包,借此访问当前计算机的IAM角色与默认的S3存储桶。Python脚本generate_cifar10_tfrecords.py使用TensorFlow下载相应数据,将其转换为TFRecord格式,然后上传至Amazon S3。
Build_docker_push_to_ecr.ipynb Jupyter notebook负责执行建立容器并将其推送至Amazon ECR的所有操做步骤。
Docker目录中还包含Dockerfile、训练与推理Python脚本,以及保存在需求文件中的各依赖项。如下截屏所示,为Docker文件夹的具体内容 —— Dockerfile以及build_docker_push_to_ecr.ipynb Jupyter notebook。
关于如何使用Amazon SageMaker运行自定义容器的更多详细信息,请参阅在Amazon SageMaker中使用您的自有算法或模型。
若是不打算构建本身的容器,能够指定由Amazon SageMaker替咱们管理容器。在这种方法下,咱们能够将训练脚本上传至Amazon S3,如kfp-sagemaker-script-mode.ipynb notebook中所示。
咱们可将Kubeflow管道表示为由@dsl.pipeline装饰的函数,参见如下代码及kfp-sagemaker-custom-container.ipynb所示。关于更多详细信息,请参阅Kubeflow Pipelines概述。
@dsl.pipeline(
name='cifar10 hpo train deploy pipeline',
description='cifar10 hpo train deploy pipeline using sagemaker'
)
def cifar10_hpo_train_deploy(region='us-west-2',
training_input_mode='File',
train_image=f'{account}.dkr.ecr.us-west-2.amazonaws.com/sagemaker-kubernetes:latest',
serving_image='763104351884.dkr.ecr.us-west-2.amazonaws.com/tensorflow-inference:1.15.2-cpu',
volume_size='50',
max_run_time='86400',
instance_type='ml.p3.2xlarge',
network_isolation='False',
traffic_encryption='False',
...
在本文的示例代码中,咱们建立一项名为cifar10_hpo_training_deploy ()的新函数,由其定义管道中各步骤间共通的参数。在该函数内,咱们还将定义如下五项管道组件。
此组件描述了超参数调优做业中的相关选项。详见如下代码:
hpo = sagemaker_hpo_op(
region=region,
image=train_image,
training_input_mode=training_input_mode,
strategy='Bayesian',
metric_name='val_acc',
metric_definitions='{"val_acc": "val_acc: ([0-9.]+)"}',
metric_type='Maximize',
static_parameters='{
"epochs": "1",
"momentum": "0.9",
"weight-decay": "0.0002",
"model_dir":"s3://'+bucket_name+'/jobs",
"sagemaker_region": "us-west-2"
}',
continuous_parameters='[
{"Name": "learning-rate", "MinValue": "0.0001", "MaxValue": "0.1", "ScalingType": "Logarithmic"}
]',
categorical_parameters='[
{"Name": "optimizer", "Values": ["sgd", "adam"]},
{"Name": "batch-size", "Values": ["32", "128", "256"]},
{"Name": "model-type", "Values": ["resnet", "custom"]}
]',
channels=channels,
output_location=f's3://{bucket_name}/jobs',
instance_type=instance_type,
instance_count='1',
volume_size=volume_size,
max_num_jobs='16',
max_parallel_jobs='4'
...
这些选项包括调优策略(贝叶斯)、优化指标(验证准确率)、连续超参数(学习率)、分类超参数(优化器、批次大小与模型类型)以及保持不变的静态超参数(例如轮次数=10)。
咱们将做业数量指定为16。Amazon SageMaker会配置16个GPU实例以运行这项超参数调优做业。
组件1的输出将被捕捉在hpo变量中。组件2则采用最佳超参数并更新轮次数,具体参见如下代码:
training_hyp = get_best_hyp_op(hpo.outputs[‘best_hyperparameters’])
为此,咱们须要定义一项自定义函数,由其获取此项输出并将轮次数更新为80。这意味着咱们须要经历更长的训练时间,但能够凭借最佳超参数提高训练效果。具体参见如下代码:
def update_best_model_hyperparams(hpo_results, best_model_epoch = "80") -> str:
import json
r = json.loads(str(hpo_results))
return json.dumps(dict(r,epochs=best_model_epoch))
get_best_hyp_op = func_to_container_op(update_best_model_hyperparams)
此组件负责描述Amazon SageMaker训练做业,此做业将使用最佳超参数与先前步骤中通过更新的轮次数。详见如下代码:
training = sagemaker_train_op(
region=region,
image=train_image,
training_input_mode=training_input_mode,
hyperparameters=training_hyp.output,
channels=channels,
instance_type=instance_type,
instance_count='1',
volume_size=volume_size,
max_run_time=max_run_time,
model_artifact_path=f's3://{bucket_name}/jobs',
network_isolation=network_isolation,
traffic_encryption=traffic_encryption,
spot_instance=spot_instance,
role=role,
)
此组件负责建立一套Amazon SageMaker模型工件,供你们做为推理端点进行部署与托管。详见如下代码:
create_model = sagemaker_model_op(
region=region,
model_name=training.outputs['job_name'],
image=serving_image,
model_artifact_url=training.outputs['model_artifact_url'],
network_isolation=network_isolation,
role=role
)
最后,此组件负责模型部署,详见如下代码:
prediction = sagemaker_deploy_op(
region=region,
model_name_1=create_model.output,
instance_type_1='ml.m5.large'
)
使用Kubeflow管道编译器,你们能够编译此管道、建立实验并运行管道。详见如下代码:
kfp.compiler.Compiler().compile(cifar10_hpo_train_deploy,'sm-hpo-train-deploy-pipeline.zip')
client = kfp.Client()
aws_experiment = client.create_experiment(name='sm-kfp-experiment')
exp_name = f'cifar10-hpo-train-deploy-kfp-{time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())}'
my_run = client.run_pipeline(aws_experiment.id, exp_name, 'sm-hpo-train-deploy-pipeline.zip')
如下截屏所示,为Kubeflow管道执行完成以后的结果(带有注释)。除了“自定义函数以更新轮次(Custom function to update epochs)”步骤以外,其他全部步骤皆由Amazon SageMaker在Kubeflow管道中实现。
咱们还能够在Amazon SageMaker控制台上监控各个步骤的当前进度。如下截屏所示,为用于执行超参数调优做业的Amazon SageMaker控制台。
在管道总体运行完毕以后,咱们的模型将做为推理端点接受托管,如如下截屏所示。
要测试该端点,请复制端点名称并使用Boto3 SDK获取预测结果:
import json, boto3, numpy as np
client = boto3.client('runtime.sagemaker')
file_name = '1000_dog.png'
with open(file_name, 'rb') as f:
payload = f.read()
response = client.invoke_endpoint(EndpointName='Endpoint-20200522021801-DR5P',
ContentType='application/x-image',
Body=payload)
pred = json.loads(response['Body'].read())['predictions']
labels = ['airplane','automobile','bird','cat','deer','dog','frog','horse','ship','truck']
for l,p in zip(labels, pred[0]):
print(l,"{:.4f}".format(p*100))
在对包含小狗的图片进行分析时,咱们应得出以下输出结果:
airplane 0.0000
automobile 0.0000
bird 0.0001
cat 0.0115
deer 0.0000
dog 99.9883
frog 0.0000
horse 0.0000
ship 0.0000
truck 0.0000
本文介绍了如何配置Kubeflow Pipelines以经过Amazon SageMaker运行机器学习做业。Kubeflow Pipelines是一套开源机器学习编排平台,在但愿立足Kubernetes构建并管理自定义机器学习工做流的开发者群体中广受欢迎。但很多开发人员及MLOps团队在Kubeflow Pipelines的实际运营中遭遇挑战,发现本身难以管理Kubernetes集群的机器学习优化工做,没法得到良好的投资回报率或者承担极高的整体拥有成本。
在面向Kubeflow Pipelines的SageMaker Components帮助下,咱们能够继续在Kubeflow Pipelines中管理自有管道,并依靠Amazon SageMaker的托管功能执行具体的机器学习任务。数据科学家与机器学习开发人员还可使用Amazon SageMaker中的最新创新成果,例如全托管超参数调优、分布式训练、Managed Spot Training、自动规模伸缩等功能。咱们还展现了如何使用Amazon SageMaker Components建立并运行一条端到端Kubeflow示例管道。完整示例请参阅GitHub。