Ray是UC Berkeley RISELab新推出的高性能分布式执行框架,它使用了和传统分布式计算系统不同的架构和对分布式计算的抽象方式,具备比Spark更优异的计算性能。html
Ray目前还处于实验室阶段,最新版本为0.2.2版本。虽然Ray自称是面向AI应用的分布式计算框架,可是它的架构具备通用的分布式计算抽象。本文对Ray进行简单的介绍,帮助你们更快地了解Ray是什么,若有描述不当的地方,欢迎不吝指正。node
首先来看一下最简单的Ray程序是如何编写的。python
# 导入ray,并初始化执行环境 import ray ray.init() # 定义ray remote函数 @ray.remote def hello(): return "Hello world !" # 异步执行remote函数,返回结果id object_id = hello.remote() # 同步获取计算结果 hello = ray.get(object_id) # 输出计算结果 print hello
在Ray里,经过Python注解@ray.remote
定义remote函数。使用此注解声明的函数都会自带一个默认的方法remote
,经过此方法发起的函数调用都是以提交分布式任务的方式异步执行的,函数的返回值是一个对象id,使用ray.get
内置操做能够同步获取该id对应的对象。熟悉Java里的Future机制的话对此应该并不陌生,或许会有人疑惑这和普通的异步函数调用没什么大的区别,可是这里最大的差别是,函数hello是分布式异步执行的。git
remote函数是Ray分布式计算抽象中的核心概念,经过它开发者拥有了动态定制计算依赖(任务DAG)的能力。好比:github
@ray.remote def A(): return "A" @ray.remote def B(): return "B" @ray.remote def C(a, b): return "C" a_id = A.remote() b_id = B.remote() c_id = C.remote(a_id, b_id) print ray.get(c_id)
例子代码中,对函数A、B的调用是彻底并行执行的,可是对函数C的调用依赖于A、B函数的返回结果。Ray能够保证函数C须要等待A、B函数的结果然正计算出来后才会执行。若是将函数A、B、C类比为DAG的节点的话,那么DAG的边就是函数C参数对函数A、B计算结果的依赖,自由的函数调用方式容许Ray能够自由地定制DAG的结构和计算依赖关系。另外,说起一点的是Python的函数能够定义函数具备多个返回值,这也使得Python的函数更自然具有了DAG节点多入和多出的特色。web
Ray是使用什么样的架构对分布式计算作出如上抽象的呢,一下给出了Ray的系统架构(来自Ray论文,参考文献1)。redis
做为分布式计算系统,Ray仍旧遵循了典型的Master-Slave的设计:Master负责全局协调和状态维护,Slave执行分布式计算任务。不过和传统的分布式计算系统不一样的是,Ray使用了混合任务调度的思路。在集群部署模式下,Ray启动了如下关键组件:数组
须要说明的是,Ray的论文中说起,全局调度器能够启动一到多个,而目前Ray的实现文档里讨论的内容都是基于一个全局调度器的状况。我猜想多是Ray尚在建设中,一些机制还未完善,后续读者能够留意此处的细节变化。浏览器
Ray的任务也是经过相似Spark中Driver的概念的方式进行提交的,有所不一样的是:安全
论文给出的架构图里并未画出Driver的概念,所以我在其基础上作了一些修改和扩充。
Ray的Driver节点和和Slave节点启动的组件几乎相同,不过却有如下区别:
基于以上架构,咱们简单讨论一下Ray中关键的操做和流程。
在PythonShell中,使用ray.init()
能够在本地启动ray,包括Driver、HeadNode(Master)和若干Slave。
import ray ray.init()
若是是直连已有的Ray集群,只须要指定RedisServer的地址便可。
ray.init(redis_address="<redis-address>")
本地启动Ray获得的输出以下:
>>> ray.init() Waiting for redis server at 127.0.0.1:58807 to respond... Waiting for redis server at 127.0.0.1:23148 to respond... Allowing the Plasma store to use up to 13.7439GB of memory. Starting object store with directory /tmp and huge page support disabled Starting local scheduler with 8 CPUs, 0 GPUs ====================================================================== View the web UI at http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5 ====================================================================== {'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store73540254', manager_name='/tmp/plasma_manager78072648', manager_port=39874)], 'redis_address': '127.0.0.1:58807', 'local_scheduler_socket_names': ['/tmp/scheduler98624129'], 'webui_url': 'http://localhost:8888/notebooks/ray_ui62614.ipynb?token=7c253b0fd66fe41294d9f2c6739e3f002c1e76f6f59b99f5', 'node_ip_address': '127.0.0.1'} >>>
本地启动Ray时,能够看到Ray的WebUI的访问地址。
使用ray.put()
能够将Python对象存入本地ObjectStore,而且异步返回一个惟一的ObjectID。经过该ID,Ray能够访问集群中任一个节点上的对象(远程对象经过查阅Master的对象表得到)。
对象一旦存入ObjectStore便不可更改,Ray的remote函数能够将直接将该对象的ID做为参数传入。使用ObjectID做为remote函数参数,能够有效地减小函数参数的写ObjectStore的次数。
@ray.remote def f(x): pass x = "hello" # 对象x往ObjectStore拷贝里10次 [f.remote(x) for _ in range(10)] # 对象x仅往ObjectStore拷贝1次 x_id = ray.put(x) [f.remote(x_id) for _ in range(10)]
使用ray.get()
能够经过ObjectID获取ObjectStore内的对象并将之转换为Python对象。对于数组类型的对象,Ray使用共享内存机制减小数据的拷贝成本。而对于其它对象则须要将数据从ObjectStore拷贝到进程的堆内存中。
若是调用ray.get()
操做时,对象还没有建立好,则get操做会阻塞,直到对象建立完成后返回。get操做的关键流程以下:
另外,ray.get()
能够一次性读取多个对象的数据:
result_ids = [ray.put(i) for i in range(10)] ray.get(result_ids) # [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Ray中使用注解@ray.remote
能够声明一个remote function。remote函数时Ray的基本任务调度单元,remote函数定义后会当即被序列化存储到RedisServer中,而且分配了一个惟一的ID,这样就保证了集群的全部节点均可以看到这个函数的定义。
不过,这样对remote函数定义有了一个潜在的要求,即remote函数内若是调用了其它的用户函数,则必须提早定义,不然remote函数没法找到对应的函数定义内容。
remote函数内也能够调用其它的remote函数,Driver和Slave每次调用remote函数时,其实都是向集群提交了一个计算任务,从这里也能够看到Ray的分布式计算的自由性。
Ray中调用remote函数的关键流程以下:
ray.put()
操做存入ObjectStore而后返回ObjectID)、函数返回值对象的ID。@ray.remote
注解有一个参数num_return_vals
用于声明remote函数的返回值个数,基于此实现remote函数的多返回值机制。
@ray.remote(num_return_vals=2) def f(): return 1, 2 x_id, y_id = f.remote() ray.get(x_id) # 1 ray.get(y_id) # 2
@ray.remote
注解的另外一个参数num_gpus
能够为任务指定GPU的资源。使用内置函数ray.get_gpu_ids()
能够获取当前任务可使用的GPU信息。
@ray.remote(num_gpus=1) def gpu_method(): return "This function is allowed to use GPUs {}.".format(ray.get_gpu_ids())
ray.wait()
操做支持批量的任务等待,基于此能够实现一次性获取多个ObjectID对应的数据。
# 启动5个remote函数调用任务 results = [f.remote(i) for i in range(5)] # 阻塞等待4个任务完成,超时时间为2.5s ready_ids, remaining_ids = ray.wait(results, num_returns=4, timeout=2500)
上述例子中,results包含了5个ObjectID,使用ray.wait
操做能够一直等待有4个任务完成后返回,并将完成的数据对象放在第一个list类型返回值内,未完成的ObjectID放在第二个list返回值内。若是设置了超时时间,那么在超时时间结束后仍未等到预期的返回值个数,则已超时完成时的返回值为准。
使用ray.error_info()能够获取任务执行时产生的错误信息。
>>> import time >>> @ray.remote >>> def f(): >>> time.sleep(5) >>> raise Exception("This task failed!!") >>> f.remote() Remote function __main__.f failed with: Traceback (most recent call last): File "<stdin>", line 4, in f Exception: This task failed!! You can inspect errors by running ray.error_info() If this driver is hanging, start a new one with ray.init(redis_address="127.0.0.1:65452") >>> ray.error_info() [{'type': 'task', 'message': 'Remote function \x1b[31m__main__.f\x1b[39m failed with:\n\nTraceback (most recent call last):\n File "<stdin>", line 4, in f\nException: This task failed!!\n', 'data': '{\'function_id\': "Hm\\xde\\x93\'\\x91\\xce\\x13ld\\xf4O\\xd7\\xce\\xc2\\xe1\\x151\\x1e3", \'function_name\': u\'__main__.f\'}'}]
Ray的remote函数只能处理无状态的计算需求,有状态的计算需求须要使用Ray的Actor实现。在Python的class定义前使用@ray.remote
能够声明Actor。
@ray.remote class Counter(object): def __init__(self): self.value = 0 def increment(self): self.value += 1 return self.value
使用以下方式建立Actor对象。
a1 = Counter.remote() a2 = Counter.remote()
Ray建立Actor的流程为:
从流程能够看出,Actor对象的建立时并行的。
经过调用Actor对象的方法使用Actor。
a1.increment.remote() # ray.get returns 1 a2.increment.remote() # ray.get returns 1
调用Actor对象的方法的流程为:
为了保证Actor状态的一致性,对同一个Actor的方法调用是串行执行的。
若是只是使用Ray,可使用以下命令直接安装。
pip intall ray
若是须要编译Ray的最新源码进行安装,按照以下步骤进行(MaxOS):
# 更新编译依赖包 brew update brew install cmake pkg-config automake autoconf libtool boost wget pip install numpy cloudpickle funcsigs click colorama psutil redis flatbuffers cython --ignore-installed six # 下载源码编译安装 git clone https://github.com/ray-project/ray.git cd ray/python python setup.py install # 测试 python test/runtest.py # 安装WebUI须要的库[可选] pip install jupyter ipywidgets bokeh # 编译Ray文档[可选] cd ray/doc pip install -r requirements-doc.txt make html open _build/html/index.html
我在MacOS上安装jupyter时,遇到了Python的setuptools库没法升级的状况,缘由是MacOS的安全性设置问题,可使用以下方式解决:
Command+R
进入Mac保护模式。csrutils disable
关闭系统安全策略。csrutils enable
,再次重启便可。进入PythonShell,输入代码本地启动Ray:
import ray ray.init()
浏览器内打开WebUI界面以下: