logo

模型分布式训练

wangzf / 2025-01-19


目录

训练方法介绍

并行方法:

硬件分类方法:

分布式训练框架

常用的分布式训练框架:

单机单卡训练

export CUDA_VISIBLE_DEVICES="0"

python -u YOUR_TRAINING_SCRIPT.py \
    --num_workders 4 \
    --use_gpu 1 \
    --gpu_type cuda \
    --use_multi_gpu 0 \
    --devices 0,1,2,3,4,5,6,7

数据并行

Data Prallel-DP

建议使用 torch.nn.parallel.DistributedDataParallel 而不是 torch.nn.DataParallel 进行多 GPU 训练,即使只有一个节点。

DistributedDataParallelDataParallel 的区别是:DistributedDataParallel 在多进程(multiprocessing)中为每个 GPU 创建一个进程, 而 DataParallel 使用多线程(multithreading)。通过使用多进程,每个 GPU 都有自己的专用进程,这避免了 Python 解释器 GIL 带来的性能开销。

Distributed Data Parallel-DDP

Fully Sharded Data Parallel-FSDP

FSDP2 原理

在 DistributedDataParallel (DDP)训练中,每个 rank 拥有一个模型副本并处理一批数据, 最后使用 all-reduce 来跨 rank 同步梯度。

与 DDP 相比,FSDP 通过分片模型参数、梯度和优化器状态来减少 GPU 内存占用。 这使得训练无法在单个 GPU 上运行的模型成为可能。如下图所示,

img

FSDP 可以被视为将 DDP 的全归约(all-reduce)操作分解为:归约散播(reduce-scater)和全收集(all-gather)操作:

img

FSDP2 使用

Model Initialization

$ torchrun --nproc_per_node 2 train.py
from torch.distributed.fsdp import fullly_shard, FSDPModule

# model
model = Transformer()

# 首先对每一层应用 fully_shard,然后对根模型应用
for layer in model.layers:
    fullly_shard(layer)
fully_shard(layer)

assert isinstance(model, Transformer)
assert isinstance(model, FSDPModule)
print(model)
from torch.distributed.tensor import DTensor

for param in model.parameters():
    assert isinstance(param, DTensor)
    assert param.placements == (Shard(0),)
    # inspect sharded parameters with param.to_local()

# optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)

Forward/Backward with Prefetching

Mixed Precision

Gradient Clipping and Optimizer with State Dicts with DTensor APIs

State Dict with DCP APIs

张量并行

数据并行 & 张量并行

流水线并行

torchrun

Elastic Launch

torch.distributed.run 是一个在每台训练节点上启动多个分布式训练进程的模块。

torchrun 是一个 Python 控制台脚本, 对应于在 setup.pyentry_points 配置中声明的 torch.distributed.run 主模块。 它等同于调用 python -m torch.distributed.run

torchrun 将 --local-rank=<rank> 参数传递给您的脚本。从 PyTorch 2.0.0 开始, 推荐使用连字符 --local-rank 而不是之前使用的下划线 --local_rank

torchrun 使用

单节点多进程

Single-node multi-worker

torchrun
    --standalone
    --nnodes=1
    --nproc-per-node=$NUM_TRAINERS
    YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)

堆叠式单节点多工作进程

Stacked single-node multi-worker

要在同一主机上运行多个单节点多工作进程实例(分离的任务),我们需要确保每个实例(任务)在不同的端口上设置, 以避免端口冲突(或者更糟,两个任务被合并为一个任务)。 为此,你必须使用 --rdzv-backend=c10d 并通过设置 --rdzv-endpoint=localhost:$PORT_k 指定不同的端口。 对于 --nodes=1,通常让 torchrun 自动选择一个空闲的随机端口更方便,而不是手动为每次运行分配不同的端口。

torchrun
    --rdzv-backend=c10d
    --rdzv-endpoint=localhost:0
    --nnodes=1
    --nproc-per-node=$NUM_TRAINERS
    YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)

容错

Fault tolerant (fixed sized number of workers, no elasticity, tolerates 3 failures)

容错(固定数量的工作进程,无弹性,可容忍 3 次失败)

torchrun
    --nnodes=$NUM_NODES
    --nproc-per-node=$NUM_TRAINERS
    --max-restarts=3
    --rdzv-id=$JOB_ID
    --rdzv-backend=c10d
    --rdzv-endpoint=$HOST_NODE_ADDR
    YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)

Elastic

Elastic (min=1, max=4, tolerates up to 3 membership changes or failures)

弹性 ( min=1 , max=4 , 可容忍最多 3 次成员变更或故障)

torchrun
    --nnodes=1:4
    --nproc-per-node=$NUM_TRAINERS
    --max-restarts=3
    --rdzv-id=$JOB_ID
    --rdzv-backend=c10d
    --rdzv-endpoint=$HOST_NODE_ADDR
    YOUR_TRAINING_SCRIPT.py (--arg1 ... train script args...)

torchrun 重要提示

  1. torchrun 和多进程分布式(单节点或多节点)GPU 训练目前仅在使用 NCCL 分布式后端时能达到最佳性能。 因此,NCCL 后端是 GPU 训练推荐使用的后端。

  2. 初始化 Torch 进程组所需的环境变量由该模块提供,无需手动传递 RANK。 要在训练脚本中初始化进程组,只需运行:

    import torch.distributed as dist
     dist.init_process_group(backend="gloo|nccl")
    
  3. 在训练程序中,可以使用常规的分布式函数,或者使用 torch.nn.parallel.DistributedDataParallel() 模块。 如果训练程序使用 GPU 进行训练,并且想使用 torch.nn.parallel.DistributedDataParallel() 模块, 以下是配置方法:

    local_rank = int(os.environ["LOCAL_RANK"])
    model = torch.nn.parallel.DistributedDataParallel(
        model, device_ids=[local_rank], output_device=local_rank
    )
    
    • 请确保 device_ids 参数设置为将操作的唯一 GPU 设备 ID。这通常是进程的本地排名。换句话说, device_ids 需要是 [int(os.environ("LOCAL_RANK"))] , 并且 output_device 需要是 int(os.environ("LOCAL_RANK")),才能使用这个工具。
  4. 在失败或成员资格变更时,所有存活的进程将立即被终止。确保保存您的进度。 检查点的频率应根据您的工作对丢失工作的容忍度来决定。

  5. 该模块仅支持同构的 LOCAL_WORLD_SIZE。也就是说,假设所有节点运行相同数量的本地进程(按角色划分)。

  6. RANK 是不稳定的。在重启之间,节点上的本地进程可能被分配与之前不同的排名范围。 永远不要硬编码任何关于排名稳定性的假设,或 RANKLOCAL_RANK 之间某些关联的假设。

  7. 在使用弹性(min_size!=max_size)时,不要硬编码关于 WORLD_SIZE 的假设,因为当节点允许离开和加入时, 世界大小可能会改变。

  8. 建议您的脚本具有以下结构:

    def main():
        load_checkpoint(checkpoint_path)
        initialize()
        train()
    
    def train():
        for batch in iter(dataset):
            train_step(batch)
    
            if should_checkpoint:
                save_checkpoint(checkpoint_path)
    
  9. (推荐) 当工作进程出错时,该工具将总结错误详情(例如时间、排名、主机、进程 ID、堆栈跟踪等)。 在每个节点上,按时间戳排序的第一个错误会被启发式地报告为“根本原因”错误。 要获取作为此错误总结输出的一部分的堆栈跟踪,您必须在训练脚本中的主入口函数上添加装饰器, 如下面的示例所示。如果没有添加装饰器,则总结将不包含异常的堆栈跟踪,而只包含退出码。

    from torch.distributed.elastic.multiprocessing.errors import record
    
    @record
    def main():
        # do train
        pass
    
    
    if __name__ == "__main__":
        main()
    

DeepSpeed 使用

DeepSpeed Model

DeepSpeed 模型训练是通过 DeepSpeed 引擎完成的。 该引擎可以包装任何类型为 torch.nn.module 的任意模型, 并具有一套最小的 API 用于模型训练和检查点保存。

初始化 DeepSpeed 引擎:

import deepspeed

model_engine, optimizer, _, _ = deepspeed_initialize(
    args = cmd_args,
    model = model,
    model_parameters = params
)

如果你已经设置了分布式环境, 你需要将:torch.distributed.init_process_group() 替换为:deepspeed.init_distributed()

但如果不需要在 deepspeed.initialize() 之后设置分布式环境,则不必使用此函数, 因为 DeepSpeed 将在其 initialize 自动初始化分布式环境。无论如何, 如果已经设置好了 torch.distributed.init_process_group,需要将其移除。

Training

一旦 DeepSpeed 引擎被初始化,它就可以使用三个简单的 API 来训练模型:前向传播(可调用对象)、 反向传播( backward )和权重更新( step )。

for step, batch in enumerate(data_loader):
    # forward
    loss = model_engine(batch)
    # backward
    model_engine.backward(loss)
    # weight update
    model_engine.step()

在底层,DeepSpeed 自动执行分布式数据并行训练所需的操作,以混合精度和预定义的学习率调度器进行:

Model Checkpointing

保存和加载训练状态是通过 DeepSpeed 中的 save_checkpointload_checkpoint API 处理的, 该 API 需要两个参数来唯一标识一个检查点:

DeepSpeed 可以自动保存和恢复模型、优化器和学习率调度器的状态,同时将这些细节隐藏起来。 然而,用户可能希望保存特定于给定模型训练的额外数据。为了支持这些项目, save_checkpoint 接受客户端状态字典 client_sd 进行保存。 这些项目可以作为返回参数从 load_checkpoint 中检索。 在下述示例中,step 值被存储为 client_sd 的一部分。

# load checkpoint
_, client_sd = model_engine.load_checkpoint(args.load_dir, args.ckpt_id)
step = client_sd['step']

# advance data loader to ckpt step
dataloader_to_step(data_loader, step + 1)

for step, batch in enumerate(data_loader):
    # forward() method
    loss = model_engine(batch)

    # runs backpropagation
    model_engine.backward(loss)

    # weight update
    model_engine.step()

    # save checkpoint
    if step % args.save_interval:
        client_sd['step'] = step
        ckpt_id = loss.item()
        model_engine.save_checkpoint(args.save_dir, ckpt_id, client_sd = client_sd)

DeepSpeed 配置

DeepSpeed 的功能可以通过一个 config JSON 文件来启用、禁用或配置, 该文件应指定为 args.deepspeed_config。下面是一个示例配置文件。

// ds_config.json
{
    "train_batch_size": 8,
    "gradient_accumulation_steps": 1,
    "optimizer": {
        "type": "Adam",
        "params": {
            "lr": 0.00015
        }
    },
    "fp16": {
        "enabled": true
    },
    "zero_optimization": true
}

启动 DeepSpeed 训练

DeepSpeed 安装入口 deepspeed 来启动分布式训练。我们通过以下假设来展示 DeepSpeed 的一个使用示例:

  1. 已经将 DeepSpeed 集成到你的模型中
  2. client_entry.py 是你的模型的入口脚本
  3. client argsargparse 命令行参数
  4. ds_config.json 是 DeepSpeed 的配置文件

资源配置

资源配置-多节点

hostfile

DeepSpeed 使用与 OpenMPI 和 Horovod 兼容的 hostfile 配置多节点计算资源。 hostfile 是一个主机名(或 SSH 别名)列表,这些是可以通过无密码 SSH 访问的机器, 以及槽位数量(slot counts),这些指定了系统上可用的 GPU 数量。

例如:下面的 hostfile 指定了名为 worker-1worker-2 的两台机器,每台机器都有四个 GPU 用于训练。

worker-1 slots=4
worker-2 slots=4

Hostfiles 通过 --hostfile 命令行选项指定。如果没有指定 hostfile, DeepSpeed 会搜索 /job/hostfile。如果没有指定或找到 hostfile, DeepSpeed 会查询本地机器上的 GPU 数量,以发现可用的本地插槽数量。

以下命令将在 myhostfile 中指定的所有可用节点和 GPU 上启动一个 PyTorch 训练作业:

$ deepspeed --hostfile=myhostfile \
    <client_entry.py> <client args> \
    --deepspeed --deepspeed_config ds_config.json

num_nodes 和 num_gpus

或者,DeepSpeed 允许你将模型的分布式训练限制在可用的节点和 GPU 的子集上。 这一功能通过两个命令行参数 --num_nodes--num_gpus 启用。 例如,可以使用以下命令将分布式训练限制在仅使用两个节点:

$ deepspeed --num_nodes=2 --num_gpus 8 \
    <client_entry.py> <client args> \
    --deepspeed --deepspeed_config ds_config.json

include 和 exclude

可以使用 --include--exclude 标志来包含或排除特定资源。 例如,要在节点 worker-2 上使用除 GPU 0 以外的所有可用资源, 并在 worker-3 上使用 GPU 0 和 GPU 1

$ deepspeed --exclude="worker-2:0@worker-3:0,1" \
    <client_entry.py> <client args> \
    --deepspeed --deepspeed_config ds_config.json

同样地,你也可以在 worker-2 上仅使用 GPU 01

$ deepspeed --include="worker-2:0,1" \
    <client_entry.py> <client args> \
    --deepspeed --deepspeed_config ds_config.json 

不使用无密码 SSH 启动

DeepSpeed 现在支持在不使用无密码 SSH 的情况下启动训练作业。 这种模式在 Kubernetes 等云环境中特别有用,这些环境允许灵活的容器编排, 而使用无密码 SSH 设置 leader-worker 架构会增加不必要的复杂性。

要使用此模式,您需要在所有节点上分别运行 DeepSpeed 命令。命令应按以下结构运行:

deepspeed --hostfile=myhostfile \
    --no_ssh \
    --node_rank=<n> \
    --master_addr=<addr> --master_port=<port> \
    <client_entry.py> <client args> \
    --deepspeed --deepspeed_config ds_config.json

每个节点必须使用唯一的 node_rank 启动,并且所有节点都需要提供领导节点的地址和端口(rank 0)。 这种模式使启动器表现得类似于 PyTorch 文档中描述的 torchrun 启动器。

多节点环境变量

在跨多个节点进行训练时,我们发现支持传播用户定义的环境变量很有用。 默认情况下,DeepSpeed 会传播所有已设置的 NCCL 和 PYTHON 相关的环境变量。 如果您想传播额外的变量,可以在名为 .deepspeed_env 的点文件中指定它们, 该文件包含用换行符分隔的 VAR=VAL 条目列表。 DeepSpeed 启动器将检查您正在执行的本地路径以及您的家目录(~/)。 如果您想覆盖此文件的默认名称或路径并用自己的名称指定,可以使用环境变量 DS_ENV_FILE。 这主要适用于您启动多个作业,而所有作业都需要不同的变量的情况。

作为一个具体的例子,某些集群需要在训练之前设置特殊的 NCCL 变量。 用户只需将这些变量添加到其主目录中的一个 .deepspeed_env 文件中,该文件看起来像这样:

NCCL_IB_DISABLE=1
NCCL_SOCKET_IFNAME=eth0

DeepSpeed 将确保在训练作业中,在每个节点上启动每个进程时都设置这些环境变量。

资源配置-单节点

如果我们只在一个节点上运行(该节点有一个或多个 GPU),DeepSpeed 就不需要像上面描述的那样使用主机文件。 如果未检测到或未传入主机文件,DeepSpeed 将查询本地机器上的 GPU 数量,以发现可用的插槽数量。 --include--exclude 参数按正常方式工作,但用户应将 localhost 指定为主机名。

此外,CUDA_VISIBLE_DEVICES 可以与 deepspeed 一起使用,以控制在一个节点上应使用哪些设备。 因此,以下任一方式都可以用于仅在当前节点的设备 01 上启动:

$ deepspeed --include localhost:0,1 ...
$ CUDA_VISIBLE_DEVICES=0,1 deepspeed ...

DeepSpeed 资料

参考