pytorch单机多GPU训练方式(微调bert)
程序员文章站
2022-07-08 17:19:13
本来是用Dataparallel来微调,但是发现gpu0上负载太大了,后来上面一看,pytorch官网推荐使用DistributedDataParallel,这个方法可以用于多机多卡或者单机多卡,速度和各方面都比Dataparallel要好很多。我主要看了下面两个博客:【分布式训练】单机多卡的正确打开方式(三):PyTorchpytorch(分布式)数据并行个人实践总结——DataParallel/DistributedDataParallel这里说一下Dataparallel和Distribute...
本来是用Dataparallel来微调,但是发现gpu0上负载太大了,后来上面一看,pytorch官网推荐使用DistributedDataParallel,这个方法可以用于多机多卡或者单机多卡,速度和各方面都比Dataparallel要好很多。
我主要看了下面两个博客:
【分布式训练】单机多卡的正确打开方式(三):PyTorch
pytorch(分布式)数据并行个人实践总结——DataParallel/DistributedDataParallel
这里说一下Dataparallel和DistributedDataparallel在使用时的区别。比如有三张卡(也就是3张gpu),Batch=30。
-
Dataparallel
只开一个进程,DistributedDataparallel
有几张卡开几个进程,不过需要在执行程序时手动设置。 -
Dataparallel
返回的loss是3张卡的loss,所以需要mean一下,这样才是整个batch的loss,也就是Dataparallel
会自动将输入的一个batch的数据切分成3份,每张卡分别计算一部分,也就是10份。
而DistributedDataparallel
是不会自动切分数据的,所以在Dataloader中,需要将sampler设置为DistributedSampler
,具体见下面的代码。DistributedDataparallel
是将整个数据集切分为3份,然后每一个进程使用其中的一份,但是每个进程的batch还是30。 -
在保存模型时,
Dataparallel和DistributedDataparallel
都需要使用model.module.state_dict
来保存模型,而不是model.state_dict
,不过DistributedDataparallel
还需要选择一个进程的模型保存,具体见代码。 -
执行
DistributedDataparallel
的程序是要用命令行执行,python -m torch.distributed.launch --nproc_per_node=3 main.py
, 其中 nproc_per_node指定了要用为每一张卡开一个进程。
import random import numpy as np from transformers import AlbertTokenizer, AlbertForMaskedLM, AdamW, get_linear_schedule_with_warmup, \ DataCollatorForLanguageModeling from torch.nn.parallel import DistributedDataParallel from torch.utils.data import Dataset, DataLoader, RandomSampler from torch.utils.data.distributed import DistributedSampler import torch import time from utils import format_time from utils import read_data torch.distributed.init_process_group(backend='nccl') tokenizer = AlbertTokenizer.from_pretrained('albert-base-v2', cache_dir='../language_model/albert') model = AlbertForMaskedLM.from_pretrained('../language_model/albert') seed_val = 42 random.seed(seed_val) np.random.seed(seed_val) torch.manual_seed(seed_val) torch.cuda.manual_seed_all(seed_val) def my_collate_fn(batch): input_ids = [] attention_masks = [] for sent in batch: encoded_dict = tokenizer.encode_plus( sent[0], # 输入文本 add_special_tokens=True, # 添加 '[CLS]' 和 '[SEP]' max_length=128, # 填充 & 截断长度 truncation=True, pad_to_max_length=True, return_attention_mask=True, # 返回 attn. masks. ) input_ids.append(torch.tensor(encoded_dict['input_ids'])) attention_masks.append(torch.tensor(encoded_dict['attention_mask'])) input_ids = torch.stack(input_ids, dim=0) attention_masks = torch.stack(attention_masks, dim=0) return [input_ids, attention_masks] class myDataset(Dataset): def __init__(self, data): self.data = data def __getitem__(self, index): return self.data[index] def __len__(self): return len(self.data) def make_data(batch_size): data = read_data('en') dataset = myDataset(data) data_dataloader = DataLoader( dataset, sampler=DistributedSampler(dataset), # 这里也是分布式的sampler batch_size=batch_size, collate_fn=my_collate_fn, num_workers=4, ) return data_dataloader ''' 这个方法也是bert自带的实现了随机mask 15%单词任务的方法。 由于我是要在领域内的数据集上微调bert,所以还是用language model的训练方法。 ''' datacollecter = DataCollatorForLanguageModeling(tokenizer) def trian(model, train_loader, optimizer, scheduler, local_rank): for epoch_i in range(0, epochs): # ======================================== # Training # ======================================== print("") print('======== Epoch {:} / {:} ========'.format(epoch_i + 1, epochs)) print('Training...') total_train_loss = 0 model.train() t0 = time.time() for step, batch in enumerate(train_loader): if step % 50 == 0 and not step == 0: elapsed = format_time(time.time() - t0) print(' Batch {:>5,} of {:>5,}. Elapsed: {:}.'.format(step, len(train_loader), elapsed)) b_input_ids, b_attention_mask = batch[0], batch[1] output = datacollecter(b_input_ids) b_input_ids, b_labels = output["input_ids"], output["labels"] b_input_ids = b_input_ids.to(device) b_attention_mask = b_attention_mask.to(device) model.zero_grad() loss, logits = model(b_input_ids, attention_mask=b_attention_mask, labels=b_labels) total_train_loss += loss.item() loss.backward() optimizer.step() scheduler.step() avg_train_loss = total_train_loss / len(train_loader) training_time = format_time(time.time() - t0) print("") print(" Average training loss: {0:.2f}".format(avg_train_loss)) print(" Training epcoh took: {:}".format(training_time)) ## 选择一个进程保存 if local_rank == 0: model.module.save_pretrained(save_path) # save_pretrained是bert自带的保存微调模型的方法 print('Saving model in %s.' % save_path) print("") print("Training complete!") print("Total training took {:} (h:mm:ss)".format(format_time(time.time() - total_t0))) epochs = 2 batch_size = 128 save_path = './model/' if __name__ == "__main__": # 为每个进程配置GPU local_rank = torch.distributed.get_rank() torch.cuda.set_device(local_rank) device = torch.device("cuda", local_rank) # 要先将model放到gpu上 model = model.to(device) model = DistributedDataParallel(model, find_unused_parameters=True, device_ids=[local_rank], output_device=local_rank) training_stats = [] total_t0 = time.time() train_loader = make_data(batch_size) total_steps = len(train_loader) * epochs optimizer = AdamW(model.parameters(), lr=2e-5, # args.learning_rate - default is 5e-5 eps=1e-8 # args.adam_epsilon - default is 1e-8 ) scheduler = get_linear_schedule_with_warmup(optimizer, num_warmup_steps=1000, num_training_steps=total_steps) trian(model, train_loader, optimizer, scheduler, local_rank)本文地址:https://blog.csdn.net/mch2869253130/article/details/108239063