在此前的文章中,我已经向你介绍了Kubeflow,这是一个为团队设置的机器学习平台,须要构建机器学习流水线。python
在本文中,咱们将了解如何采用现有的机器学习详细并将其变成Kubeflow的机器学习流水线,进而能够部署在Kubernetes上。在进行本次练习的时候,请考虑你该如何将现有的机器学习项目转换到Kubeflow上。linux
我将使用Fashion MNIST做为例子,由于在本次练习中模型的复杂性并非咱们须要解决的主要目标。对于这一简单的例子,我将流水线分为3个阶段:git
Git clone代码库github
下载并从新处理训练和测试数据docker
训练评估bash
固然,你能够根据本身的用例将流水线以任意形式拆分,而且能够随意扩展流水线。ssh
你能够从Github上获取代码:机器学习
% git clone https://github.com/benjamintanweihao/kubeflow-mnist.git
如下是咱们用来建立流水线的完整清单。实际上,你的代码极可能跨多个库和文件。在咱们的例子中,咱们将代码分为两个脚本,preprocessing.py
和train.py
。函数
from tensorflow import keras import argparse import os import pickle def preprocess(data_dir: str): fashion_mnist = keras.datasets.fashion_mnist (train_images, train_labels), (test_images, test_labels) = fashion_mnist.load_data() train_images = train_images / 255.0 test_images = test_images / 255.0 os.makedirs(data_dir, exist_ok=True) with open(os.path.join(data_dir, 'train_images.pickle'), 'wb') as f: pickle.dump(train_images, f) with open(os.path.join(data_dir, 'train_labels.pickle'), 'wb') as f: pickle.dump(train_labels, f) with open(os.path.join(data_dir, 'test_images.pickle'), 'wb') as f: pickle.dump(test_images, f) with open(os.path.join(data_dir, 'test_labels.pickle'), 'wb') as f: pickle.dump(test_labels, f) if __name__ == '__main__': parser = argparse.ArgumentParser(description='Kubeflow MNIST training script') parser.add_argument('--data_dir', help='path to images and labels.') args = parser.parse_args() preprocess(data_dir=args.data_dir)
处理脚本采用单个参数data_dir
。它下载并预处理数据,并将pickled
版本保存在data_dir
中。在生产代码中,这多是TFRecords的存储目录。工具
train.py
import calendar import os import time import tensorflow as tf import pickle import argparse from tensorflow import keras from constants import PROJECT_ROOT def train(data_dir: str): # Training model = keras.Sequential([ keras.layers.Flatten(input_shape=(28, 28)), keras.layers.Dense(128, activation='relu'), keras.layers.Dense(10)]) model.compile(optimizer='adam', loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True), metrics=['accuracy']) with open(os.path.join(data_dir, 'train_images.pickle'), 'rb') as f: train_images = pickle.load(f) with open(os.path.join(data_dir, 'train_labels.pickle'), 'rb') as f: train_labels = pickle.load(f) model.fit(train_images, train_labels, epochs=10) with open(os.path.join(data_dir, 'test_images.pickle'), 'rb') as f: test_images = pickle.load(f) with open(os.path.join(data_dir, 'test_labels.pickle'), 'rb') as f: test_labels = pickle.load(f) # Evaluation test_loss, test_acc = model.evaluate(test_images, test_labels, verbose=2) print(f'Test Loss: {test_loss}') print(f'Test Acc: {test_acc}') # Save model ts = calendar.timegm(time.gmtime()) model_path = os.path.join(PROJECT_ROOT, f'mnist-{ts}.h5') tf.saved_model.save(model, model_path) with open(os.path.join(PROJECT_ROOT, 'output.txt'), 'w') as f: f.write(model_path) print(f'Model written to: {model_path}') if __name__ == '__main__': parser = argparse.ArgumentParser(description='Kubeflow FMNIST training script') parser.add_argument('--data_dir', help='path to images and labels.') args = parser.parse_args() train(data_dir=args.data_dir)
在train.py
中,将创建模型,并使用data_dir
指定训练和测试数据的位置。模型训练完毕而且开始执行评估后,将模型写入带有时间戳的路径。请注意,该路径也已写入output.txt
。稍后将对此进行引用。
为了开始建立Kubeflow流水线,咱们须要拉取一些依赖项。我准备了一个environment.yml
,其中包括了kfp 0.5.0
、tensorflow
以及其余所需的依赖项。
你须要安装Conda,而后执行如下步骤:
% conda env create -f environment.yml % source activate kubeflow-mnist % python preprocessing.py --data_dir=/path/to/data % python train.py --data_dir=/path/to/data
如今咱们来回顾一下咱们流水线中的几个步骤:
Git clone代码库
下载并预处理训练和测试数据
训练并进行评估
在咱们开始写代码以前,须要从宏观上了解Kubeflow流水线。
流水线由链接组件构成。一个组件的输出成为另外一个组件的输入,每一个组件实际上都在容器中执行(在本例中为Docker)。将发生的状况是,咱们会执行一个咱们稍后将要指定的Docker镜像,它包含了咱们运行preprocessing.py
和train.py
所需的一切。固然,这两个阶段会有它们的组件。
咱们还须要额外的一个镜像以git clone
项目。咱们须要将项目bake到Docker镜像,但在实际项目中,这可能会致使Docker镜像的大小膨胀。
说到Docker镜像,咱们应该先建立一个。
若是你只是想进行测试,那么这个步骤不是必须的,由于我已经在Docker Hub上准备了一个镜像。这是Dockerfile
的全貌:
FROM tensorflow/tensorflow:1.14.0-gpu-py3 LABEL MAINTAINER "Benjamin Tan <benjamintanweihao@gmail.com>" SHELL ["/bin/bash", "-c"] # Set the locale RUN echo 'Acquire {http::Pipeline-Depth "0";};' >> /etc/apt/apt.conf RUN DEBIAN_FRONTEND="noninteractive" RUN apt-get update && apt-get -y install --no-install-recommends locales && locale-gen en_US.UTF-8 ENV LANG en_US.UTF-8 ENV LANGUAGE en_US:en ENV LC_ALL en_US.UTF-8 RUN apt-get install -y --no-install-recommends \ wget \ git \ python3-pip \ openssh-client \ python3-setuptools \ google-perftools && \ rm -rf /var/lib/apt/lists/* # install conda WORKDIR /tmp RUN wget --quiet https://repo.anaconda.com/miniconda/Miniconda3-4.7.12-Linux-x86_64.sh -O ~/miniconda.sh && \ /bin/bash ~/miniconda.sh -b -p /opt/conda && \ rm ~/miniconda.sh && \ ln -s /opt/conda/etc/profile.d/conda.sh /etc/profile.d/conda.sh && \ echo ". /opt/conda/etc/profile.d/conda.sh" >> ~/.bashrc # build conda environments COPY environment.yml /tmp/kubeflow-mnist/conda/ RUN /opt/conda/bin/conda update -n base -c defaults conda RUN /opt/conda/bin/conda env create -f /tmp/kubeflow-mnist/conda/environment.yml RUN /opt/conda/bin/conda clean -afy # Cleanup RUN rm -rf /workspace/{nvidia,docker}-examples && rm -rf /usr/local/nvidia-examples && \ rm /tmp/kubeflow-mnist/conda/environment.yml # switch to the conda environment RUN echo "conda activate kubeflow-mnist" >> ~/.bashrc ENV PATH /opt/conda/envs/kubeflow-mnist/bin:$PATH RUN /opt/conda/bin/activate kubeflow-mnist # make /bin/sh symlink to bash instead of dash: RUN echo "dash dash/sh boolean false" | debconf-set-selections && \ DEBIAN_FRONTEND=noninteractive dpkg-reconfigure dash # Set the new Allocator ENV LD_PRELOAD /usr/lib/x86_64-linux-gnu/libtcmalloc.so.
关于Dockerfile值得关注的重要一点是Conda环境是否设置完成并准备就绪。要构建镜像:
% docker build -t your-user-name/kubeflow-mnist . -f Dockerfile % docker push your-user-name/kubeflow-mnist
那么,如今让咱们来建立第一个组件!
在pipeline.py
中能够找到如下代码片断。
在这一步中,咱们将从远程的Git代码库中执行一个git clone
。特别是,我想要向你展现如何从私有仓库中进行git clone
,由于这是大多数企业的项目所在的位置。固然,这也是一个很好的机会来演示Rancher中一个很棒的功能,它能简单地添加诸如SSH密钥之类的密钥。
访问Rancher界面。在左上角,选择local,而后选择二级菜单的Default:
而后,选择Resources下的Secrets
你应该看到一个密钥的列表,它们正在被你刚刚选择的集群所使用。点击Add Secret:
使用你在下图中所看到的值来填写该页面。若是kubeflow没有在命名空间栏下展现出来,你能够经过选择Add to a new namespace
而且输入kubeflow
简单地建立一个。
确保Scope仅是个命名空间。若是将Scope设置为全部命名空间,那么将使得在Default项目中的任意工做负载都可以使用你的ssh密钥。
在Secret Values中,key是id_rsa
,值是id_rsa
的内容。完成以后,点击Save。
若是一些进展顺利,你将会看到下图的内容。如今你已经成功地在kubeflow命名空间中添加了你的SSH密钥,而且无需使用kubectl!
既然咱们已经添加了咱们的SSH key,那么是时候回到代码。咱们如何利用新添加的SSH密钥来访问私有git仓库?
def git_clone_darkrai_op(repo_url: str): volume_op = dsl.VolumeOp( name="create pipeline volume", resource_name="pipeline-pvc", modes=["ReadWriteOnce"], size="3Gi" ) image = 'alpine/git:latest' commands = [ "mkdir ~/.ssh", "cp /etc/ssh-key/id_rsa ~/.ssh/id_rsa", "chmod 600 ~/.ssh/id_rsa", "ssh-keyscan bitbucket.org >> ~/.ssh/known_hosts", f"git clone {repo_url} {PROJECT_ROOT}", f"cd {PROJECT_ROOT}"] op = dsl.ContainerOp( name='git clone', image=image, command=['sh'], arguments=['-c', ' && '.join(commands)], container_kwargs={'image_pull_policy': 'IfNotPresent'}, pvolumes={"/workspace": volume_op.volume} ) # Mount Git Secrets op.add_volume(V1Volume(name='ssh-key-volume', secret=V1SecretVolumeSource(secret_name='ssh-key-secret'))) op.add_volume_mount(V1VolumeMount(mount_path='/etc/ssh-key', name='ssh-key-volume', read_only=True)) return op
首先,建立一个Kubernetes volume,预约义大小为3Gi。其次,将image
变量指定为咱们将要使用的alpine/git
Docker镜像。以后是在Docker容器中执行的命令列表。这些命令实质上是设置SSH密钥的,以便于流水线能够从私有仓库git clone
,或者使用git://URL
来代替 https://
。
该函数的核心是下面一行,返回一个dsl.ContainerOp
。
command
和arguments
指定了执行镜像以后须要执行的命令。
最后一个变量十分有趣,是pvolumes
,它是Pipeline Volumes简称。它建立一个Kubernetes volume并容许流水线组件来共享单个存储。该volume被挂载在/workspace
上。那么这个组件要作的就是把仓库git clone
到/workspace
中。
再次查看命令和复制SSH密钥的位置。
流水线volume在哪里建立呢?当咱们将全部组件都整合到一个流水线中时,就会看到建立好的volume。咱们在/etc/ssh-key/
上安装secrets:
op.add_volume_mount(V1VolumeMount(mount_path='/etc/ssh-key', name='ssh-key-volume', read_only=True))
请记得咱们将secret命名为ssh-key-secret
:
op.add_volume(V1Volume(name='ssh-key-volume', secret=V1SecretVolumeSource(secret_name='ssh-key-secret')))
经过使用相同的volume名称ssh-key-volume
,咱们能够把一切绑定在一块儿。
def preprocess_op(image: str, pvolume: PipelineVolume, data_dir: str): return dsl.ContainerOp( name='preprocessing', image=image, command=[CONDA_PYTHON_CMD, f"{PROJECT_ROOT}/preprocessing.py"], arguments=["--data_dir", data_dir], container_kwargs={'image_pull_policy': 'IfNotPresent'}, pvolumes={"/workspace": pvolume} )
正如你所看到的, 预处理步骤看起来十分类似。
image
指向咱们在Step0中建立的Docker镜像。
这里的command
使用指定的conda python简单地执行了preprocessing.py
脚本。变量data_dir
被用于执行preprocessing.py
脚本。
在这一步骤中pvolume
将在/workspace
里有仓库,这意味着咱们全部的脚本在这一阶段都是可用的。而且在这一步中预处理数据会存储在/workspace
下的data_dir
中。
def train_and_eval_op(image: str, pvolume: PipelineVolume, data_dir: str, ): return dsl.ContainerOp( name='training and evaluation', image=image, command=[CONDA_PYTHON_CMD, f"{PROJECT_ROOT}/train.py"], arguments=["--data_dir", data_dir], file_outputs={'output': f'{PROJECT_ROOT}/output.txt'}, container_kwargs={'image_pull_policy': 'IfNotPresent'}, pvolumes={"/workspace": pvolume} )
最后,是时候进行训练和评估这一步骤。这一步惟一的区别在于file_outputs
变量。若是咱们再次查看train.py
,则有如下代码段:
with open(os.path.join(PROJECT_ROOT, 'output.txt'), 'w') as f: f.write(model_path) print(f'Model written to: {model_path}')
咱们正在将模型路径写入名为output.txt
的文本文件中。一般,能够将其发送到下一个流水线组件,在这种状况下,该参数将包含模型的路径。
要指定流水线,你须要使用dsl.pipeline
来注释流水线功能:
@dsl.pipeline( name='Fashion MNIST Training Pipeline', description='Fashion MNIST Training Pipeline to be executed on KubeFlow.' ) def training_pipeline(image: str = 'benjamintanweihao/kubeflow-mnist', repo_url: str = 'https://github.com/benjamintanweihao/kubeflow-mnist.git', data_dir: str = '/workspace'): git_clone = git_clone_darkrai_op(repo_url=repo_url) preprocess_data = preprocess_op(image=image, pvolume=git_clone.pvolume, data_dir=data_dir) _training_and_eval = train_and_eval_op(image=image, pvolume=preprocess_data.pvolume, data_dir=data_dir) if __name__ == '__main__': import kfp.compiler as compiler compiler.Compiler().compile(training_pipeline, __file__ + '.tar.gz')
还记得流水线组件的输出是另外一个组件的输入吗?在这里,git clone
、container_op
的pvolume
将传递到preprocess_cp
。
最后一部分将pipeline.py
转换为可执行脚本。最后一步是编译流水线:
% dsl-compile --py pipeline.py --output pipeline.tar.gz
如今要进行最有趣的部分啦!第一步,上传流水线。点击Upload a pipeline
:
接下来,填写Pipeline Name
和Pipeline Description
,而后选择Choose file
而且指向pipeline.tar.gz
以上传流水线。
下一页将会展现完整的流水线。咱们所看到的是一个流水线的有向无环图,在本例中这意味着依赖项会通往一个方向而且它不包含循环。点击蓝色按钮Create run
以开始训练。
大部分字段已经已经填写完毕。请注意,Run parameters
与使用@ dsl.pipeline
注释的training_pipeline
函数中指定的参数相同:
最后,当你点击蓝色的Start按钮时,整个流水线就开始运转了!你点击每一个组件并查看日志就可以知道发生了什么。当整个流水线执行完毕时,在全部组件的右方会有一个绿色的确认标志,以下所示:
若是你从上一篇文章开始就一直在关注,那么你应该已经安装了Kubeflow,而且应该能体会到大规模管理机器学习项目的复杂性。
在这篇文章中,咱们先介绍了为Kubeflow准备一个机器学习项目的过程,而后是构建一个Kubeflow流水线,最后是使用Kubeflow接口上传并执行流水线。这种方法的奇妙之处在于,你的机器学习项目能够是简单的,也能够是复杂的,只要你愿意,你就可使用相同的技术。
由于Kubeflow使用Docker容器做为组件,你能够自由地加入任何你喜欢的工具。并且因为Kubeflow运行在Kubernetes上,你可让Kubernetes处理机器学习工做负载的调度。
咱们还了解了一个我喜欢的Rancher功能,它十分方便,能够轻松添加secrets。马上,你就能够轻松地组织secrets(如SSH密钥),并选择将其分配到哪一个命名空间,而无需为Base64编码而烦恼。就像Rancher的应用商店同样,这些便利性使Kubernetes的工做更加愉快,更不容易出错。
固然,Rancher提供的服务远不止这些,我鼓励你本身去作一些探索。我相信你会偶然发现一些让你大吃一惊的功能。Rancher做为一个开源的企业级Kubernetes管理平台,Run Kubernetes Everywhere一直是咱们的愿景和宗旨。开源和无厂商锁定的特性,可让用户轻松地在不一样的基础设施部署和使用Rancher。此外,Rancher极简的操做体验也可让用户在不一样的场景中利用Rancher提高效率,帮助开发人员专一于创新,而无需在繁琐的小事中浪费精力。