Overview
Basically, there are two types of distributed training: data parallelism and model parallelism.
- Data Parallelism: the model is replicated on each device and each replica processes a different portion of the input data. The gradients are then aggregated across all devices and the model is updated.
- Model Parallelism: different parts of the model are placed on different devices and the forward and backward passes are executed in parallel.
Note that the parallelism in PyTorch is only supported on Linux.
Data Parallelism
Data Parallel(DP)
DP is the simplest way to use multiple GPUs. Only one extra line of code is needed to implement DP.
1 | model = torch.nn.DataParallel(model) |
The steps are as follows:
- Forward pass:
- Scatter a batch of input data from one assigned GPU to all GPUs.
- Replicate the model on all GPUs.
- Parallel forward pass on all GPUs.
- Gather the output from all GPUs to the assigned GPU.
- Backward pass:
- Compute the loss and gradient on the assigned GPU.
- Scatter the gradient from the assigned GPU to all GPUs.
- Parallel backward pass on all GPUs
- Gather the gradient from all GPUs to the assigned GPU and update the model.
Note that the assigned GPU has to accommodate the whole batch of data, and conduct all the work of model updating. Thus the performance of DP is limited concerning both space and time.
Distributed Data Parallel(DDP)
Steps
DDP is a more advanced way to use multiple GPUs. It is more flexible and efficient than DP. The steps are as follows:
- Preparation
- Replicate the model on all GPUs and divide the input data among all GPUs equally and randomly. Each GPU load its own data from the disk.
- Training
- Forward: The computation of loss function is done on each GPU without gathering the results on one assigned GPU.
- Backward: Each process communicates with each other by All-Reduce operation to exchange the gradients and compute the average gradient. The model is updated on each GPU using the same average gradient.
- Updating: Each process has its own optimizer and updates the model on its own GPU. Since the initial value and the gradient are the same, the model on each GPU is also the same.
To further improve the performance, DDP uses a further optimized version of All-Reduce. Obviously, it is not efficient to communicate after finishing all the computation of gradient. Therefore, the model parameters are partitioned into a lot of buckets, Once the gradient computation in bucket \(h-1\) is finished, the communication of gradient in bucket \(h-1\) and the gradient computation of bucket \(h\) begin simultaneously
Backend Communication
The backend communication of DDP supports diffenrent protocals. The choice of protocals are determined by the factors below
- Network Environment
- Ethernet:
nccl
has a better performance,gloo
is for spare use - InfiniBand:
nccl
only
- Ethernet:
- Operators:
nccl
supports more diverse operators thangloo
In practical use, nccl
is the prior choice.
Initialization
- TCP mode: Independently start each process in bash and allocate rank.
- ENV mode: The program can search for required values in required variables
Start Up
mp.spawn()
: The modulemp
capsulates the packagemultiprocessing
, not specifically designed for DDPtorchrun
: Automatically set the configuration of env variables. Only need to setos.environ['CUDA_VISIBLE_DEVICES']
manuallytorch.distributed.launch
: To be deprecated
Parameters used in the last two ways of startup(i.e., parameters passed through the startup command)
--nproc_per_node
: number of processes per machine--nnodes
: number of machines/nodes--node_rank
: the idx of this machine--master_addr
: the IP of idx-0 machine--master_port
: the available port of idx-0 machine
Other parameters and concepts that may be used in the program
node
: a machine, which may contain multiple GPUsworld_size
: number of processes in all nodes, ranging from 1 toworld_size
\(-1\)local_rank
: the rank of the process in the current node. If there are 4 GPUs in one node, thelocal_rank
ranges from 0 to 3rank
: the rank of the process in all nodes
Program Revision(mp.spawn())
Import
1 | import torch.distributed as dist |
dist
: communication between GPUsmp
: DDP startupGradScaler
: automatic mixed precisionDistributedSampler
: Data sampler in a distributed settingDDP
: model capsulation and passing
Key functions
- Function
init_ddp
for initializationInitialize the process using1
2
3
4def init_ddp(local_rank):
torch.cuda.set_device(local_rank)
os.environ['RANK'] = str(local_rank)
dist.init_process_group(backend='nccl',init_method='env://')nccl
as communication protocal and ENV mode for initialization. After initializing withinit_ddp
, we can getlocal_rank
andworld_size
in following code easily.Given that all processes have the same model, Only one processes is responsible for printing log and saving checkpoint.1
2local_rank = dist.get_rank() # rank of current process
world_size = dist.get_world_size() # number of processes in current server1
2
3if local_rank == 0:
print(f'Begin validating')
... - Function
reduce_tensor
to gather data from different processes1
2
3
4
5def reduce_tensor(tensor: torch.Tensor):
rt = tensor.clone()
dist.all_reduce(rt, op=dist.reduce_op.SUM)
rt /= dist.get_world_size
return rt - Function
get_ddp_generator
to enhance the randomness1
2
3
4
5def get_ddp_generator(seed=3407):
local_rank = dist.get_rank()
g = torch.Generator()
g.manual_seed(seed + local_rank)
return g
Program Entry
- The parameter
nprocs
should be equal toworldsize
otherwise will cause dead lock1
2
3
4
5
6
7
8
9
10
11
12
13if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-gpu', default='0,1,2,3', type=str)
...
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '19198'
os.environ['CUDA_VISIBLE_DEVICES'] = args.gpu
world_size = torch.cuda.device_count()
os.environ['WORLD_SIZE'] = str(world_size)
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = "max_split_size_mb:128"
if args.mode == 'train':
mp.spawn(fn=train, nprocs=world_size, args=(args,))
Main Function
- The argument
local_rank
is automatically allocated bymp.spawn()
- The function
init_ddp()
is needed for the initialization of every process1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19def main(local_rank, args):
...
init_ddp(local_rank)
...
model.cuda()
model = nn.SyncBatchnorm.convert_sync_batchnorm(model)
...
num_gpus = torch.cuda.device_count()
if num_gpus > 1:
model = nn.parallel.DistributedDataParallel(model, device_id=[local_rank], output_device=local_rank)
...
scaler = GradScaler()
...
for epoch in range(epochs):
if local_rank == 0:
....
train_dataloader.sampler.set_epoch(epoch)
train(model, ..., scaler, args)
dist.destroy_process_group()
Get_dataloader Function
1 | def get_dataloader(path, args, ..., train: bool): |
Train Function
1 | def train(...): |
- Note that the back propagation is conducted respectively on each GPU but not
reduced_loss
. Thereduced_loss
is the sum of all losses on different GPUs. #### Validation Function1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23@torch.no_grad()
def validate(...):
model.eval()
...
with torch.cuda.amp.autocast():
output = ...
loss = ...
reduced_loss = reduce_tensor(loss.data)
eval_loss += reduced_loss.item()
if dist.get_rank() == 0:
print(...)
pred_labels = ...
true_labels = ...
pred_bools = ...
macro = ...
macro = reduce_tensor(torch.tensor(macro)).cuda()
return macro
Program Revision(torchrun)
- Similar to the previous section
- No need to import
mp
and use the capsulation ofmp.spawn()
- Only need to configure
MASTER_ADDR
andMASTER_PORT
manually in program local_rank
no more serves as an argument inmain
function. Callos.environ['LOCAL_RANK']
to use the variable