假设你有多个很耗时的任务,比如训练多个神经网络模型:
for model in models:
train(model)
使用 ray 可以很简单地实现在实现定义好的集群分布式地(或本服务器上并行地)训练这些神经网络模型:
import ray
for model in models:
ray.remote(train).options(num_gpus=1).remote(model)
1、组建集群
事实上,如果只在本地进行并行计算,不需要组建集群。前面的示例, ray 会自动在本服务器上启动一个节点,然后可以执行并行任务。这时候有点类似于使用multiproccessing
启动一个子进程,但在使用上更方便,而且涉及到调用 GPU 时, ray 能自动调度到最空闲的 GPU 设备上。
若要组建 ray 集群也特别简单。云服务一般都内置了服务。对于自己的普通服务器,先选一台机器作为控制机器( ray 称之为 head 节点),直接执行下面命令即可,其中 6379 是集群通信的端口,可以任意选择自己喜欢的:
sudo -E pip3 install ray
ray start --head --port=6379
在集群里添加服务器(在 ray 里称之为 node ,或节点,注意 head 机器自动成为一个 node ),直接在这些服务器上执行下面命令即可:
sudo -E pip3 install ray
ray start --address ip-of-head-machine:6379
如果我们要关闭一个节点:
# 关闭节点。如果在head节点上执行该命令,所有节点都会被关闭。
# 如果在非head节点上执行,则不会影响其它节点和head节点。
ray stop
注意事项:
- 集群里的服务器的 Python 版本要保持严格一致,精确到小版本。即使 3.8.5 和 3.8.10 都不行。
- 服务器之间除了指定的端口外,还需要用到多个端口(待完善)。
集群组建完毕后,可以通过ray status
查看集群信息,会显示当前有多少个节点, CPU 核和 GPU ,以及已经使用了多少。其结果类似于:
$ ray status
======== Autoscaler status: 2024-07-08 11:35:41.940675 ========
Node status
---------------------------------------------------------------
Active:
1 node_fca78d3c7f990e99f092154ada842be982e5e6acb0bbf87ecaf90dc6
1 node_5f1502b1eae186c0e859e801a6f5c5bf80f29ec561c675c0e37755f3
1 node_4c4067efd85155e66eabc12a4c5b45ef47837686e356fbce44b07ded
Pending:
(no pending nodes)
Recent failures:
(no failures)
Resources
---------------------------------------------------------------
Usage:
0.0/112.0 CPU
0.0/2.0 GPU
0B/462.92GiB memory
0B/202.06GiB object_store_memory
Demands:
(no resource demands)
你甚至不需要到服务器上去依次启动。如果你配置了服务器之间的 SSH 免 Key 登录,那么只需要修改配置文件,填入控制机器和节点机器的 IP ,在某台机器上简单执行该配置文件即可。ray 会按照配置文件依次登录机器,配置和启动该机器上的服务,组建集群:
# 下载样例配置
wget https://raw.githubusercontent.com/ray-project/ray/master/python/ray/autoscaler/local/example-full.yaml
# 修改配置,主要是添加head/node机器的IP。
vim example-full.yaml
# 启动集群
ray up example-full.yaml
# 查看集群
ray attach example-full.yaml
# 关闭集群
ray down example-full.yaml
还有一个特殊的情况, ray 有一个 dashboard 服务,可以通过网页查看集群数据。但需要安装ray[default]
:
pip install 'ray[default]'
ray start --head
2、提交或执行任务
如最前面的例子所示,假设我们有一个很耗时的函数func(*args,**kwargs)
,我们可以用ray.remote(func).remote(*args, **kwargs)
来分布式执行:
def train(model, date):
time.sleep(600)
return 1
ret = ray.remote(train).remote(model, date)
ray.remote(func).remote(*args, **kwargs)
分为两部分,第一部分ray.remote(func)
将一个函数转为远程函数。除了这个方法, 也可以直接在函数定义时添加@ray.remote
装饰器:
@ray.remote
def remote_train(model, date):
time.sleep(600)
return 1
ret = remote_train.remote(model, date=date)
不过我个人还是更喜欢定义普通函数,然后在需要的时候通过ray.remote()
进行转换,这样函数既可以常规地执行,也可以分布式地执行。
后面的.remote(*args, **kwargs)
则是在远程执行该函数。ray 会自动选择集群里合适的节点来执行这些任务。任务在结束后自动销毁,解除对资源的占用。
3、指定任务所需要的资源
一种方法是在装饰器里标记:
@ray.remote(num_cpus=1, num_gpus=0.2)
def remote_train(model, date): ...
ret = remote_train.remote(model, date)
还有一种方法是通过options
函数添加或修改资源要求:
ret = ray.remote(train).options(num_cpus=1, num_gpus=0.2).remote(model)
若不指定,则表示不用占用资源。若指定,在任务执行过程中,集群的资源会被扣减(即使任务实际并没有使用这些资源),当资源不足时,新的任务将需要等待。
在指定 GPU 时, ray 会设置CUDA_VISIBLE_DEVICES
,任务将只能看到指定(个数)的 GPU ,不能自己随意选择 GPU ,否则会报错。
任务在调度时会考虑到所需要的资源。不指定 GPU ,程序能看到节点上所有的 GPU ,但任务就不会考虑这些 GPU 的占用情况,有可能被调度到 GPU 已经满载的机器上。因此在真正需要资源时,还是指定更好一些。
4、任务的返回值
remote 执行的返回值是一个引用,也是一个 future 值,即它还不是一个真正的返回值,因为任务可能还在计算中。我们需要使用ray.get
来获取真正的返回值,但这可能会导致程序的挂起,即需要等待并行任务执行完毕返回后,当前程序才会继续向下运行:
ret1 = ray.remote(train).remote(model1)
ret2 = ray.remote(train).remote(model2)
# 下面这一行会等到上面train(model1)执行完毕,但不用等待 train(model2)。
ret1 = ray.get(ret1)
ray.get 支持一次性获取多个返回值,列表或者字典都可以的:
rets = [ray.remote(train).remote(model) for model in models]
rets = ray.get(rets)
5、带状态的任务 Actor
Actor 被认为是一个带状态的任务,也就是它内部可以不断执行子任务,且子任务之间有公共的状态:
class Calculator:
def __init__(self):
self._sum = 0
def add(self, i):
time.sleep(10)
self._sum += i
return self._sum
def minus(self, i):
time.sleep(10)
self._sum -= i
return self._sum
# calc 将绑定到某个节点上。
calc = ray.remote(Calculator).options(num_cpus=1).remote();
# 此时 add(1) 和 minus(2) 是串行的。
s1 = calc.add.remote(1)
s2 = calc.minus.remote(2)
print(ray.get(s2))
也可以用装饰器@ray.remote
:
@ray.remote(num_cpus=1)
class Calculator: ...
calc = Calculator.remote();
s1 = calc.add.remote(1)
s2 = calc.minus.remote(2)
print(ray.get(s2))
上面代码里 calc
是 在集群的某个节点上创建了一个 Calculator 类的远程实例,这种实例被称之为 actor。后面就可以不断调用其成员函数在远程的节点上运行。
Actor 在变量的存续期内是一直占有资源的,不像普通函数任务( task ),运行结束之后直接销毁。ray list actors
可以查看存续的 actors。如果想让 actor 提前销毁释放资源,可以手工ray.kill(actor)
。
6、分布式任务池
Actor 可以认为是一个分布式任务代理,因为我们可以很简单地定义一个run
成员用来执行任意函数任务:
@ray.remote(gpu_num=1)
class GenericActor:
def run(self, func, args, kwargs):
return func(*args, **kwargs)
actor = GenericActor.remote()
# 注意此时r1, r2 的执行是串行的!
r1 = actor.run.remote(train, (model, date1))
r2 = actor.run.remote(train, (model, date2))
利用它我们可以很容易实现一个分布式池( Pool ),以固定的并行数量来执行任意任务。 ray
内置了 ActorPool,可以集合多个 Actor ,在上面执行任务。
actors = [GenericActor.remote() for _ in range(num_workers)]
pool = ActorPool(actors)
# 提交多个任务,返回的是一个iter。
rs = pool.map(lambda a, v: a.run.remote(*v), [
(train, (model, date)) for date in dates
])
# 对于 pool.map,直接实例化iter,就会自动获取到结果。
rs = list(rs)
# 提交单个任务 train(model, date),注意这个函数没有返回值!
pool.submit(lambda a, v: a.run.remote(*v), (train, (model, date1))
pool.submit(lambda a, v: a.run.remote(*v), (train, (model, date2))
# 获取任务结果,此时需要使用 poo.get_next(),但需要自己去维护结果和之前提交任务之间的对应关系。
r1 = pool.get_next()
r2 = pool.get_next()
7、多用户体系
ray 的一个关键问题是,它在设计上是针对单用户的,多用户的权限有很大问题。如果某个用户(我们称该用户为启动用户)启动了一个集群:
- 另外一个用户可以随便查看和访问集群。集群对它是透明的。
- ray 使用了
/tmp/ray
目录作为存储目录,若不修改该目录权限,其它用户启动无法读取这些数据,无法提交任务到集群。 - 提交的任务都在启动用户的权限下执行,这意味着: 1 )启动用户可能不具备足够的权限,比如提交的任务可能访问了任务用户个人目录下的数据或文件。2 )任务获得了启动用户的权限,比如他可以随便访问启动用户的文件或数据,他也可以查看其它任务的数据,理论上启动用户的数据被泄露了。
8、API
8.1、ray.init()
它有一个返回值,其 address_info 属性可以获得 head 节点的信息:
RayContext(
dashboard_url='',
python_version='3.8.10',
ray_version='2.10.0',
ray_commit='09abba26b5bf2707639bb637c208d062a47b46f6',
address_info={
'node_ip_address': '172.18.90.154',
'raylet_ip_address': '172.18.90.154',
'redis_address': None,
'object_store_address': '/tmp/ray-zhangzq/session_2024-07-11_11-45-31_938877_1655112/sockets/plasma_store',
'raylet_socket_name': '/tmp/ray-zhangzq/session_2024-07-11_11-45-31_938877_1655112/sockets/raylet',
'webui_url': '',
'session_dir': '/tmp/ray-zhangzq/session_2024-07-11_11-45-31_938877_1655112',
'metrics_export_port': 64734,
'gcs_address': '172.18.90.154:8085',
'address': '172.18.90.154:8085',
'dashboard_agent_listen_port': 52365,
'node_id': '6f42be814626b489c596e956915f889dc711a3c52817c2227913ef78'
}
)
8.2、ray.nodes()
返回节点列表,每个节点是一个字典。但需要注意,死掉的节点也包含在内。需要通过 Alive 进行判断。
每个节点的结果信息如下:
{
"NodeID": "bc39451211d1507e8f740907a498c43a534650bb608dd59b93043007",
"Alive": true,
"NodeManagerAddress": "172.18.90.185",
"NodeManagerHostname": "sh185",
"NodeManagerPort": 44609,
"ObjectManagerPort": 40453,
"ObjectStoreSocketName": "/tmp/ray-zhangzq/session_2024-07-11_11-45-31_938877_1655112/sockets/plasma_store.3",
"RayletSocketName": "/tmp/ray-zhangzq/session_2024-07-11_11-45-31_938877_1655112/sockets/raylet.2",
"MetricsExportPort": 54316,
"NodeName": "172.18.90.185",
"RuntimeEnvAgentPort": 61960,
"alive": true,
"Resources": {
"accelerator_type:RTX": 1.0,
"CPU": 32.0,
"node:172.18.90.185": 1.0,
"GPU": 8.0,
"object_store_memory": 161813962752.0,
"memory": 377565913088.0
},
"Labels": {
"ray.io/node_id": "bc39451211d1507e8f740907a498c43a534650bb608dd59b93043007"
}
},
9、遇到的一些问题
9.1、ray 2.10 以上 不支持 python3.8 及以下版本
python3.8 只能用 ray 2.10。而现在 ray 最新版本是 2.32。
9.2、ray::IDLE 进程
ray 会产生非常多的 ray::IDLE 进程(和 CPU 核心一样多),官方说法是待命进程,降低任务响应时间的。
一般情况下没啥问题,只是看着烦心。但有时候这个任务会跑到 GPU 上,而且任务还不释放 GPU 的显存,导致大量 IDLE 进程占用了 GPU 显存,其它任务无法执行了。
9.3、ssh 启动工作节点会挂
下面命令,从 24 启动 25 作为工作节点,刚开始看着正常,但很快该节点就会掉线:
ssh -t 10.24.10.25 "ray start --address 10.24.10.24:8083"
直接 ssh 登录 25 后执行 ray start --address 10.24.10.24:8083
则一切正常。
Q. E. D.