使用 ray 做分布式计算和训练

作者: , 共 8524 字 , 共阅读 0

假设你有多个很耗时的任务,比如训练多个神经网络模型:

for model in models:
    train(model)

使用 ray 可以很简单地实现在实现定义好的集群分布式地(或本服务器上并行地)训练这些神经网络模型:

import ray

for model in models:
    ray.remote(train).options(num_gpus=1).remote(model)

1、组建集群

事实上,如果只在本地进行并行计算,不需要组建集群。前面的示例, ray 会自动在本服务器上启动一个节点,然后可以执行并行任务。这时候有点类似于使用multiproccessing启动一个子进程,但在使用上更方便,而且涉及到调用 GPU 时, ray 能自动调度到最空闲的 GPU 设备上。

若要组建 ray 集群也特别简单。云服务一般都内置了服务。对于自己的普通服务器,先选一台机器作为控制机器( ray 称之为 head 节点),直接执行下面命令即可,其中 6379 是集群通信的端口,可以任意选择自己喜欢的:

sudo -E pip3 install ray 
ray start --head --port=6379

在集群里添加服务器(在 ray 里称之为 node ,或节点,注意 head 机器自动成为一个 node ),直接在这些服务器上执行下面命令即可:

sudo -E pip3 install ray 
ray start --address ip-of-head-machine:6379

如果我们要关闭一个节点:

# 关闭节点。如果在head节点上执行该命令,所有节点都会被关闭。
# 如果在非head节点上执行,则不会影响其它节点和head节点。
ray stop

注意事项:

  • 集群里的服务器的 Python 版本要保持严格一致,精确到小版本。即使 3.8.5 和 3.8.10 都不行。
  • 服务器之间除了指定的端口外,还需要用到多个端口(待完善)。

集群组建完毕后,可以通过ray status查看集群信息,会显示当前有多少个节点, CPU 核和 GPU ,以及已经使用了多少。其结果类似于:

$ ray status
======== Autoscaler status: 2024-07-08 11:35:41.940675 ========
Node status
---------------------------------------------------------------
Active:
 1 node_fca78d3c7f990e99f092154ada842be982e5e6acb0bbf87ecaf90dc6
 1 node_5f1502b1eae186c0e859e801a6f5c5bf80f29ec561c675c0e37755f3
 1 node_4c4067efd85155e66eabc12a4c5b45ef47837686e356fbce44b07ded
Pending:
 (no pending nodes)
Recent failures:
 (no failures)

Resources
---------------------------------------------------------------
Usage:
 0.0/112.0 CPU
 0.0/2.0 GPU
 0B/462.92GiB memory
 0B/202.06GiB object_store_memory

Demands:
 (no resource demands)

你甚至不需要到服务器上去依次启动。如果你配置了服务器之间的 SSH 免 Key 登录,那么只需要修改配置文件,填入控制机器和节点机器的 IP ,在某台机器上简单执行该配置文件即可。ray 会按照配置文件依次登录机器,配置和启动该机器上的服务,组建集群:

# 下载样例配置
wget https://raw.githubusercontent.com/ray-project/ray/master/python/ray/autoscaler/local/example-full.yaml
# 修改配置,主要是添加head/node机器的IP。
vim example-full.yaml

# 启动集群
ray up example-full.yaml

# 查看集群
ray attach example-full.yaml

# 关闭集群
ray down example-full.yaml

还有一个特殊的情况, ray 有一个 dashboard 服务,可以通过网页查看集群数据。但需要安装ray[default]

pip install 'ray[default]'
ray start --head

2、提交或执行任务

如最前面的例子所示,假设我们有一个很耗时的函数func(*args,**kwargs),我们可以用ray.remote(func).remote(*args, **kwargs)来分布式执行:

def train(model, date):
    time.sleep(600)
    return 1    

ret = ray.remote(train).remote(model, date)

ray.remote(func).remote(*args, **kwargs)分为两部分,第一部分ray.remote(func)将一个函数转为远程函数。除了这个方法, 也可以直接在函数定义时添加@ray.remote装饰器:

@ray.remote
def remote_train(model, date):
    time.sleep(600)
    return 1

ret = remote_train.remote(model, date=date)

不过我个人还是更喜欢定义普通函数,然后在需要的时候通过ray.remote()进行转换,这样函数既可以常规地执行,也可以分布式地执行。

后面的.remote(*args, **kwargs)则是在远程执行该函数。ray 会自动选择集群里合适的节点来执行这些任务。任务在结束后自动销毁,解除对资源的占用。

3、指定任务所需要的资源

一种方法是在装饰器里标记:

@ray.remote(num_cpus=1, num_gpus=0.2)
def remote_train(model, date): ...

ret = remote_train.remote(model, date)

还有一种方法是通过options函数添加或修改资源要求:

ret = ray.remote(train).options(num_cpus=1, num_gpus=0.2).remote(model)

若不指定,则表示不用占用资源。若指定,在任务执行过程中,集群的资源会被扣减(即使任务实际并没有使用这些资源),当资源不足时,新的任务将需要等待。

在指定 GPU 时, ray 会设置CUDA_VISIBLE_DEVICES,任务将只能看到指定(个数)的 GPU ,不能自己随意选择 GPU ,否则会报错。

任务在调度时会考虑到所需要的资源。不指定 GPU ,程序能看到节点上所有的 GPU ,但任务就不会考虑这些 GPU 的占用情况,有可能被调度到 GPU 已经满载的机器上。因此在真正需要资源时,还是指定更好一些。

4、任务的返回值

remote 执行的返回值是一个引用,也是一个 future 值,即它还不是一个真正的返回值,因为任务可能还在计算中。我们需要使用ray.get来获取真正的返回值,但这可能会导致程序的挂起,即需要等待并行任务执行完毕返回后,当前程序才会继续向下运行:

ret1 = ray.remote(train).remote(model1)
ret2 = ray.remote(train).remote(model2)

# 下面这一行会等到上面train(model1)执行完毕,但不用等待 train(model2)。
ret1 = ray.get(ret1)

ray.get 支持一次性获取多个返回值,列表或者字典都可以的:

rets = [ray.remote(train).remote(model) for model in models]
rets = ray.get(rets)

5、带状态的任务 Actor

Actor 被认为是一个带状态的任务,也就是它内部可以不断执行子任务,且子任务之间有公共的状态:

class Calculator:
    def __init__(self):
        self._sum = 0

    def add(self, i):
        time.sleep(10)
        self._sum += i
        return self._sum

    def minus(self, i):
        time.sleep(10)
        self._sum -= i
        return self._sum

# calc 将绑定到某个节点上。
calc = ray.remote(Calculator).options(num_cpus=1).remote();
# 此时 add(1) 和 minus(2) 是串行的。
s1 = calc.add.remote(1)
s2 = calc.minus.remote(2)

print(ray.get(s2))

也可以用装饰器@ray.remote

@ray.remote(num_cpus=1)
class Calculator: ...

calc = Calculator.remote();
s1 = calc.add.remote(1)
s2 = calc.minus.remote(2)

print(ray.get(s2))

上面代码里 calc是 在集群的某个节点上创建了一个 Calculator 类的远程实例,这种实例被称之为 actor。后面就可以不断调用其成员函数在远程的节点上运行。

Actor 在变量的存续期内是一直占有资源的,不像普通函数任务( task ),运行结束之后直接销毁。ray list actors可以查看存续的 actors。如果想让 actor 提前销毁释放资源,可以手工ray.kill(actor)

6、分布式任务池

Actor 可以认为是一个分布式任务代理,因为我们可以很简单地定义一个run成员用来执行任意函数任务:

@ray.remote(gpu_num=1)
class GenericActor:
    def run(self, func, args, kwargs):
        return func(*args, **kwargs)

actor = GenericActor.remote()

# 注意此时r1, r2 的执行是串行的!
r1 = actor.run.remote(train, (model, date1))
r2 = actor.run.remote(train, (model, date2))

利用它我们可以很容易实现一个分布式池( Pool ),以固定的并行数量来执行任意任务。 ray内置了 ActorPool,可以集合多个 Actor ,在上面执行任务。

actors = [GenericActor.remote() for _ in range(num_workers)]
pool = ActorPool(actors)

# 提交多个任务,返回的是一个iter。
rs = pool.map(lambda a, v: a.run.remote(*v), [
    (train, (model, date)) for date in dates
])

# 对于 pool.map,直接实例化iter,就会自动获取到结果。
rs = list(rs)

# 提交单个任务 train(model, date),注意这个函数没有返回值!
pool.submit(lambda a, v: a.run.remote(*v), (train, (model, date1))
pool.submit(lambda a, v: a.run.remote(*v), (train, (model, date2))

# 获取任务结果,此时需要使用 poo.get_next(),但需要自己去维护结果和之前提交任务之间的对应关系。
r1 = pool.get_next()
r2 = pool.get_next()

7、多用户体系

ray 的一个关键问题是,它在设计上是针对单用户的,多用户的权限有很大问题。如果某个用户(我们称该用户为启动用户)启动了一个集群:

  1. 另外一个用户可以随便查看和访问集群。集群对它是透明的。
  2. ray 使用了/tmp/ray目录作为存储目录,若不修改该目录权限,其它用户启动无法读取这些数据,无法提交任务到集群。
  3. 提交的任务都在启动用户的权限下执行,这意味着: 1 )启动用户可能不具备足够的权限,比如提交的任务可能访问了任务用户个人目录下的数据或文件。2 )任务获得了启动用户的权限,比如他可以随便访问启动用户的文件或数据,他也可以查看其它任务的数据,理论上启动用户的数据被泄露了。

8、API

8.1、ray.init()

它有一个返回值,其 address_info 属性可以获得 head 节点的信息:

RayContext(
    dashboard_url='', 
    python_version='3.8.10', 
    ray_version='2.10.0', 
    ray_commit='09abba26b5bf2707639bb637c208d062a47b46f6',
    address_info={
        'node_ip_address': '172.18.90.154', 
        'raylet_ip_address': '172.18.90.154', 
        'redis_address': None, 
        'object_store_address': '/tmp/ray-zhangzq/session_2024-07-11_11-45-31_938877_1655112/sockets/plasma_store', 
        'raylet_socket_name': '/tmp/ray-zhangzq/session_2024-07-11_11-45-31_938877_1655112/sockets/raylet', 
        'webui_url': '', 
        'session_dir': '/tmp/ray-zhangzq/session_2024-07-11_11-45-31_938877_1655112', 
        'metrics_export_port': 64734, 
        'gcs_address': '172.18.90.154:8085', 
        'address': '172.18.90.154:8085', 
        'dashboard_agent_listen_port': 52365, 
        'node_id': '6f42be814626b489c596e956915f889dc711a3c52817c2227913ef78'
    }
)

8.2、ray.nodes()

返回节点列表,每个节点是一个字典。但需要注意,死掉的节点也包含在内。需要通过 Alive 进行判断。

每个节点的结果信息如下:

{
    "NodeID": "bc39451211d1507e8f740907a498c43a534650bb608dd59b93043007",
    "Alive": true,
    "NodeManagerAddress": "172.18.90.185",
    "NodeManagerHostname": "sh185",
    "NodeManagerPort": 44609,
    "ObjectManagerPort": 40453,
    "ObjectStoreSocketName": "/tmp/ray-zhangzq/session_2024-07-11_11-45-31_938877_1655112/sockets/plasma_store.3",
    "RayletSocketName": "/tmp/ray-zhangzq/session_2024-07-11_11-45-31_938877_1655112/sockets/raylet.2",
    "MetricsExportPort": 54316,
    "NodeName": "172.18.90.185",
    "RuntimeEnvAgentPort": 61960,
    "alive": true,
    "Resources": {
        "accelerator_type:RTX": 1.0,
        "CPU": 32.0,
        "node:172.18.90.185": 1.0,
        "GPU": 8.0,
        "object_store_memory": 161813962752.0,
        "memory": 377565913088.0
    },
    "Labels": {
        "ray.io/node_id": "bc39451211d1507e8f740907a498c43a534650bb608dd59b93043007"
    }
},

9、遇到的一些问题

9.1、ray 2.10 以上 不支持 python3.8 及以下版本

python3.8 只能用 ray 2.10。而现在 ray 最新版本是 2.32。

9.2、ray::IDLE 进程

ray 会产生非常多的 ray::IDLE 进程(和 CPU 核心一样多),官方说法是待命进程,降低任务响应时间的。

一般情况下没啥问题,只是看着烦心。但有时候这个任务会跑到 GPU 上,而且任务还不释放 GPU 的显存,导致大量 IDLE 进程占用了 GPU 显存,其它任务无法执行了。

9.3、ssh 启动工作节点会挂

下面命令,从 24 启动 25 作为工作节点,刚开始看着正常,但很快该节点就会掉线:

ssh -t 10.24.10.25 "ray start --address 10.24.10.24:8083"

直接 ssh 登录 25 后执行 ray start --address 10.24.10.24:8083 则一切正常。

Q. E. D.

类似文章:
编程 » Python, 并行计算
核心就是threading.Thread
编程 » python, 单元测试
unittest 是 python 官方的单元测试工具。最近发现一个之前没注意到的盲区:
相似度: 0.080
以下对并行计算的个人理解受到较多质疑,删除之。
相似度: 0.074
IT » github
github 自从废除用户名密码直接登录之后,就乱了很多。直接用户名密码会提示:
命令行参数的初步说明,请参考argparse 模块用法实例详解,写的很清晰而详细。
安装 selenium ,使用 requestium 来调用 selenium 程序更为简单,因此可一起安装:
编程 » django, requests, python
这里的 requests 是指 Python 的 requests 包。
IT » WSL, SSH
世界上最好的 Linux 发行版 Bash on Windows 已经升级到了 Ubuntu 18.04 ,并且提供越来越多的功能。下面是笔记,记录如何启动子系统的 SSHD 服务并设置开机自动启动,也顺带开机自动启动了子系统。
编程 » Python
今天写一段程序时遇到一个问题,查了好一会才搞清楚。代码可以简化为下面这个小代码:
IT » git
如果还没有执行git add,此时被认为尚未暂存以备提交的变更,git status结果如下:
户外 » 浆板, 戏水, 昆玉河
周六下午,在昆玉河的长河湾划了浆板。
京蔚高速开通后,发现从北三环到东灵山从三小时降到了两小时。实测从家里到洪水口聚灵峡口只用了八十分钟。所以组队安排了这次灵山两日游,走一个大环线: