0%

Pytorch Distributed Training(1)--DP/DDP

Overview

Basically, there are two types of distributed training: data parallelism and model parallelism.

  • Data Parallelism: the model is replicated on each device and each replica processes a different portion of the input data. The gradients are then aggregated across all devices and the model is updated.
  • Model Parallelism: different parts of the model are placed on different devices and the forward and backward passes are executed in parallel.

Note that the parallelism in PyTorch is only supported on Linux.

Data Parallelism

Data Parallel(DP)

DP is the simplest way to use multiple GPUs. Only one extra line of code is needed to implement DP.

1
model = torch.nn.DataParallel(model)

The steps are as follows:

  • Forward pass:
    • Scatter a batch of input data from one assigned GPU to all GPUs. alt text
    • Replicate the model on all GPUs. alt text
    • Parallel forward pass on all GPUs. alt text
    • Gather the output from all GPUs to the assigned GPU. alt text
  • Backward pass:
    • Compute the loss and gradient on the assigned GPU. alt text
    • Scatter the gradient from the assigned GPU to all GPUs. alt text
    • Parallel backward pass on all GPUs alt text
    • Gather the gradient from all GPUs to the assigned GPU and update the model. alt text

Note that the assigned GPU has to accommodate the whole batch of data, and conduct all the work of model updating. Thus the performance of DP is limited concerning both space and time.

Distributed Data Parallel(DDP)

Steps

DDP is a more advanced way to use multiple GPUs. It is more flexible and efficient than DP. The steps are as follows:

  • Preparation
    • Replicate the model on all GPUs and divide the input data among all GPUs equally and randomly. Each GPU load its own data from the disk.
  • Training
    • Forward: The computation of loss function is done on each GPU without gathering the results on one assigned GPU.
    • Backward: Each process communicates with each other by All-Reduce operation to exchange the gradients and compute the average gradient. The model is updated on each GPU using the same average gradient.
    • Updating: Each process has its own optimizer and updates the model on its own GPU. Since the initial value and the gradient are the same, the model on each GPU is also the same.

To further improve the performance, DDP uses a further optimized version of All-Reduce. Obviously, it is not efficient to communicate after finishing all the computation of gradient. Therefore, the model parameters are partitioned into a lot of buckets, Once the gradient computation in bucket \(h-1\) is finished, the communication of gradient in bucket \(h-1\) and the gradient computation of bucket \(h\) begin simultaneously

Backend Communication

The backend communication of DDP supports diffenrent protocals. The choice of protocals are determined by the factors below

  • Network Environment
    • Ethernet: nccl has a better performance, gloo is for spare use
    • InfiniBand: nccl only
  • Operators: nccl supports more diverse operators than gloo

In practical use, nccl is the prior choice.

Initialization

  • TCP mode: Independently start each process in bash and allocate rank.
  • ENV mode: The program can search for required values in required variables

Start Up

  • mp.spawn(): The module mp capsulates the package multiprocessing, not specifically designed for DDP
  • torchrun: Automatically set the configuration of env variables. Only need to set os.environ['CUDA_VISIBLE_DEVICES'] manually
  • torch.distributed.launch: To be deprecated

Parameters used in the last two ways of startup(i.e., parameters passed through the startup command)

  • --nproc_per_node: number of processes per machine
  • --nnodes: number of machines/nodes
  • --node_rank: the idx of this machine
  • --master_addr: the IP of idx-0 machine
  • --master_port: the available port of idx-0 machine

Other parameters and concepts that may be used in the program

  • node: a machine, which may contain multiple GPUs
  • world_size: number of processes in all nodes, ranging from 1 to world_size\(-1\)
  • local_rank: the rank of the process in the current node. If there are 4 GPUs in one node, the local_rank ranges from 0 to 3
  • rank: the rank of the process in all nodes

Program Revision(mp.spawn())

Import

1
2
3
4
5
import torch.distributed as dist
import torch.multiprocessing as mp
from torch.cuda.amp import GradScaler
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
  • dist: communication between GPUs
  • mp: DDP startup
  • GradScaler: automatic mixed precision
  • DistributedSampler: Data sampler in a distributed setting
  • DDP: model capsulation and passing

Key functions

  • Function init_ddp for initialization
    1
    2
    3
    4
    def init_ddp(local_rank):
    torch.cuda.set_device(local_rank)
    os.environ['RANK'] = str(local_rank)
    dist.init_process_group(backend='nccl',init_method='env://')
    Initialize the process using nccl as communication protocal and ENV mode for initialization. After initializing with init_ddp, we can get local_rank and world_size in following code easily.
    1
    2
    local_rank = dist.get_rank() # rank of current process
    world_size = dist.get_world_size() # number of processes in current server
    Given that all processes have the same model, Only one processes is responsible for printing log and saving checkpoint.
    1
    2
    3
    if local_rank == 0:
    print(f'Begin validating')
    ...
  • Function reduce_tensor to gather data from different processes
    1
    2
    3
    4
    5
    def reduce_tensor(tensor: torch.Tensor):
    rt = tensor.clone()
    dist.all_reduce(rt, op=dist.reduce_op.SUM)
    rt /= dist.get_world_size
    return rt
  • Function get_ddp_generator to enhance the randomness
    1
    2
    3
    4
    5
    def get_ddp_generator(seed=3407):
    local_rank = dist.get_rank()
    g = torch.Generator()
    g.manual_seed(seed + local_rank)
    return g

Program Entry

  • The parameter nprocs should be equal to worldsize otherwise will cause dead lock
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    if __name__ == '__main__':
    parser = argparse.ArgumentParser()
    parser.add_argument('-gpu', default='0,1,2,3', type=str)
    ...
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '19198'
    os.environ['CUDA_VISIBLE_DEVICES'] = args.gpu
    world_size = torch.cuda.device_count()
    os.environ['WORLD_SIZE'] = str(world_size)
    os.environ['PYTORCH_CUDA_ALLOC_CONF'] = "max_split_size_mb:128"

    if args.mode == 'train':
    mp.spawn(fn=train, nprocs=world_size, args=(args,))

Main Function

  • The argument local_rank is automatically allocated by mp.spawn()
  • The function init_ddp() is needed for the initialization of every process
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    def main(local_rank, args):
    ...
    init_ddp(local_rank)
    ...
    model.cuda()
    model = nn.SyncBatchnorm.convert_sync_batchnorm(model)
    ...
    num_gpus = torch.cuda.device_count()
    if num_gpus > 1:
    model = nn.parallel.DistributedDataParallel(model, device_id=[local_rank], output_device=local_rank)
    ...
    scaler = GradScaler()
    ...
    for epoch in range(epochs):
    if local_rank == 0:
    ....
    train_dataloader.sampler.set_epoch(epoch)
    train(model, ..., scaler, args)
    dist.destroy_process_group()

Get_dataloader Function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def get_dataloader(path, args, ..., train: bool):
...
if train:
train_sampler = DistributedSampler(data, shuffle=True)
g = get_ddp_generator()
dataloader = DataLoader(dataset=data,
batch_size=args['batch_size'],
num_workers=args['num_workers'],
pin_memory=True,
shuffle=False,
sampler=train_sampler,
generator=g)
else:
test_sampler = DistributedSampler(data, shuffle=False)
dataloader = DataLoader(dataset=data,
batch_size=args['batch_size'],
num_workers=args['num_workers'],
pin_memory=True,
shuffle=False, # 采用顺序采样器
sampler=test_sampler)

return dataloader

Train Function

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def train(...):
model.train()
...
for step, batch in enumerate(train_dataloader):
...
with torch.cuda.amp.autocast():
output = ...
loss = ...
...
reduced_loss = reduce_tensor(loss.data)
if dist.get_rank() == 0:
print(...)
train_loss += reduced_loss.item()

optimizer.zero_grad()
scaler.scale(loss).backward()
scaler.step(optimizer)
scheduler.step()
scaler.update()
...
torch.cuda.empty_cache()

if dist.get_rank() == 0:
print(...)

  • Note that the back propagation is conducted respectively on each GPU but not reduced_loss. The reduced_loss is the sum of all losses on different GPUs. #### Validation Function
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    @torch.no_grad()
    def validate(...):
    model.eval()
    ...
    with torch.cuda.amp.autocast():
    output = ...

    loss = ...
    reduced_loss = reduce_tensor(loss.data)
    eval_loss += reduced_loss.item()

    if dist.get_rank() == 0:
    print(...)


    pred_labels = ...
    true_labels = ...
    pred_bools = ...
    macro = ...

    macro = reduce_tensor(torch.tensor(macro)).cuda()

    return macro

Program Revision(torchrun)

  • Similar to the previous section
  • No need to import mp and use the capsulation of mp.spawn()
  • Only need to configure MASTER_ADDR and MASTER_PORT manually in program
  • local_rank no more serves as an argument in main function. Call os.environ['LOCAL_RANK'] to use the variable