Pytorch中的dataloader以及处理变长数据
起初,我最开始单独训练一个网络来完成landmark点回归任务和分类任务,训练的数据是txt格式,在训练之前对数据进行分析,发现分类任务中存在严重的数据样本不均衡的问题,那么我事先针对性的进行数据采样均衡操作,重新得到训练和测试的txt数据和标签,保证了整个训练和测试数据的样本均衡性。由于我的整个项目是检测+点回归+分类,起初检测和点回归+分类是分两步实现的,检测是通过读取XML格式来进行训练,现在要统一整个项目的训练和测试过程,要将点回归+分类的训练测试过程也按照读取XML格式来进行,那么就遇到一个问题,如何针对性的去给样本偏少的样本进行均衡,由于在dataset类中,返回的图像和标签都是针对每个index返回一个结果,在dataset类中进行操作似乎不太可行,那么就想到在dataloader中进行操作,通过dataloader中的参数sample来完成针对性采样。
还有一个问题是关于num_workers的设置,因为我有对比过,在我的单机RTX 2080Ti上和八卡服务器TITAN RTX上(仅使用单卡,其它卡有在跑其它任务),使用相同的num_workers,在单机上的训练速度反而更快,于是猜想可能和CPU或者内存有关系,下面会具体分析。
首先来看下下dataloader中的各个参数的含义。
类的定义为:torch.utils.data.DataLoader
,其中包含的参数有:
torch.utils.data.DataLoader(dataset, batch_size=1, shuffle=False, sampler=None, \
batch_sampler=None, num_workers=0, collate_fn=None, pin_memory=False, \
drop_last=False, timeout=0, worker_init_fn=None, multiprocessing_context=None)
dataset:定义的dataset类返回的结果。
batchsize:每个bacth要加载的样本数,默认为1。
shuffle:在每个epoch中对整个数据集data进行shuffle重排,默认为False。
sample:定义从数据集中加载数据所采用的策略,如果指定的话,shuffle必须为False;batch_sample类似,表示一次返回一个batch的index。
num_workers:表示开启多少个线程数去加载你的数据,默认为0,代表只使用主进程。
collate_fn:表示合并样本列表以形成小批量的Tensor对象。
pin_memory:表示要将load进来的数据是否要拷贝到pin_memory区中,其表示生成的Tensor数据是属于内存中的锁页内存区,这样将Tensor数据转义到GPU中速度就会快一些,默认为False。
drop_last:当你的整个数据长度不能够整除你的batchsize,选择是否要丢弃最后一个不完整的batch,默认为False。
注:这里简单科普下pin_memory,通常情况下,数据在内存中要么以锁页的方式存在,要么保存在虚拟内存(磁盘)中,设置为True后,数据直接保存在锁页内存中,后续直接传入cuda;否则需要先从虚拟内存中传入锁页内存中,再传入cuda,这样就比较耗时了,但是对于内存的大小要求比较高。
下面针对num_workers,sample和collate_fn分别进行说明:
1. 设置num_workers:
pytorch中dataloader一次性创建num_workers
个子线程,然后用batch_sampler
将指定batch分配给指定worker,worker将它负责的batch加载进RAM,dataloader就可以直接从RAM中找本轮迭代要用的batch。如果num_worker
设置得大,好处是寻batch速度快,因为下一轮迭代的batch很可能在上一轮/上上一轮...迭代时已经加载好了。坏处是内存开销大,也加重了CPU负担(worker加载数据到RAM的进程是进行CPU复制)。如果num_worker设为0,意味着每一轮迭代时,dataloader不再有自主加载数据到RAM这一步骤,只有当你需要的时候再加载相应的batch,当然速度就更慢。num_workers
的经验设置值是自己电脑/服务器的CPU核心数,如果CPU很强、RAM也很充足,就可以设置得更大些,对于单机来说,单跑一个任务的话,直接设置为CPU的核心数最好。
2. 定义sample:(假设dataset类返回的是:data, label)
from torch.utils.data.sampler import WeightedRandomSampler
## 如果label为1,那么对应的该类别被取出来的概率是另外一个类别的2倍
weights = [2 if label == 1 else 1 for data, label in dataset]
sampler = WeightedRandomSampler(weights,num_samples=10, replacement=True)
dataloader = DataLoader(dataset, batch_size=16, sampler=sampler)
PyTorch中提供的这个sampler模块,用来对数据进行采样。默认采用SequentialSampler,它会按顺序一个一个进行采样。常用的有随机采样器:RandomSampler,当dataloader的shuffle参数为True时,系统会自动调用这个采样器,实现打乱数据。这里使用另外一个很有用的采样方法: WeightedRandomSampler,它会根据每个样本的权重选取数据,在样本比例不均衡的问题中,可用它来进行重采样。replacement
用于指定是否可以重复选取某一个样本,默认为True,即允许在一个epoch中重复采样某一个数据。
3. 定义collate_fn:
def detection_collate(batch):
"""Custom collate fn for dealing with batches of images that have a different
number of associated object annotations (bounding boxes).
Arguments:
batch: (tuple) A tuple of tensor images and lists of annotations
Return:
A tuple containing:
1) (tensor) batch of images stacked on their 0 dim
2) (list of tensors) annotations for a given image are stacked on
0 dim
"""
targets = []
imgs = []
for sample in batch:
imgs.append(sample[0])
targets.append(torch.FloatTensor(sample[1]))
return torch.stack(imgs, 0), targets
使用dataloader时加入collate_fn参数,即可合并样本列表以形成小批量的Tensor对象,如果你的标签不止一个的话,还可以支持自定义,在上述方法中再额外添加对应的label即可。
data_loader = torch.utils.data.DataLoader(dataset, args.batch_size,
num_workers=args.num_workers, sampler=sampler, shuffle=False,
collate_fn=detection_collate, pin_memory=True, drop_last=True)
现在的问题:有的时候,特别对于NLP任务来说,输入的数据可能不是定长的,比如多个句子的长度一般不会一致,这时候使用DataLoader加载数据时,不定长的句子会被胡乱切分,这肯定是不行的。
解决方法是重写DataLoader的collate_fn,具体方法如下:
# 假如每一个样本为:
sample = {
# 一个句子中各个词的id
'token_list' : [5, 2, 4, 1, 9, 8],
# 结果y
'label' : 5,
}
# 重写collate_fn函数,其输入为一个batch的sample数据
def collate_fn(batch):
# 因为token_list是一个变长的数据,所以需要用一个list来装这个batch的token_list
token_lists = [item['token_list'] for item in batch]
# 每个label是一个int,我们把这个batch中的label也全取出来,重新组装
labels = [item['label'] for item in batch]
# 把labels转换成Tensor
labels = torch.Tensor(labels)
return {
'token_list': token_lists,
'label': labels,
}
# 在使用DataLoader加载数据时,注意collate_fn参数传入的是重写的函数
DataLoader(trainset, batch_size=4, shuffle=True, num_workers=4, collate_fn=collate_fn)
Pytorch加载变长度序列数据
在处理序列数据集时,有时会遇到变长度的样本。此时因为尺寸不一致,无法直接利用pytorch中dataloader的默认加载方式(沿着批维度直接Stack)。
处理这种数据集,一种办法是可以事先记录每个样本的长度,并把所有的数据集样本补全至最长的样本长度,这样所有样本长度一致,可以直接加载。但是会有一个问题,就是例如在使用RNN建模时,这些padding的0值会对模型造成额外影响.参考这篇文章。
pytorch中通过函数torch.nn.utils.rnn.pack_padded_sequence()以及torch.nn.utils.rnn.pad_packed_sequence()来解决这个问题。torch.nn.utils.rnn.pack_padded_sequence()通过利用
pad之后的样本和每个原始序列的长度对补全后的样本进行pack。这样RNN模型在计算时,根据原来的样本长度就知道每个样本在何时结束,从而避免额外的pad的0值的影响。计算完之后通过torch.nn.utils.rnn.pad_packed_sequence()将输出的格式转换为pack之前的格式。
collate_fn
另一种办法是通过自定义collate_fn,并将其传入DataLoader,从而实现自定义的批数据聚合方式。这里给出一些示例。
这篇文章给出了一种解决思路
示例1
问题背景
想要使用pytorch 框架中的 Dataset 和 Dataloader 类,将变长序列整合为batch数据 (主要是对长短不一的序列进行补齐),通过自定义collate_fn函数,实现对变长数据的处理。
主要思路
Dataset 主要负责读取单条数据,建立索引方式。
Dataloader 负责将数据聚合为batch。
测试环境: python 3.6 ,pytorch 1.2.0
数据路径:
data路径下存储的是待存储的数据样本。
举例:其中的 1.json 样本格式为:
定义数据集class,进行数据索引
数据集class定义代码:
import os
import numpy as np
import torch
from torch.utils.data import Dataset
from tqdm import tqdm
class time_series_dataset(Dataset):
def __init__(self, data_root):
"""
:param data_root: 数据集路径
"""
self.data_root = data_root
file_list = os.listdir(data_root)
file_prefix = []
for file in file_list:
if '.json' in file:
file_prefix.append(file.split('.')[0])
file_prefix = list(set(file_prefix))
self.data = file_prefix
def __len__(self):
return len(self.data)
def __getitem__(self, index):
prefix = self.data[index]
import json
with open(self.data_root+prefix+'.json','r',encoding='utf-8') as f:
data_dic=json.load(f)
feature = np.array(data_dic['feature'])
length=len(data_dic['feature'])
feature = torch.from_numpy(feature)
label = np.array(data_dic['label'])
label = torch.from_numpy(label)
sample = {'feature': feature, 'label': label, 'id': prefix,'length':length}
return sample
这里dataset将每个样本的数据,标签、以及每个样本的长度都包裹在一个字典里并返回。
数据集实例化:
dataset = time_series_dataset("./data/") # "./data/" 为数据集文件存储路径
基于此数据集的实际数据格式如下:
举例: dataset[0]
{'feature': tensor([17, 14, 16, 18, 14, 16], dtype=torch.int32),
'label': tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0,
0], dtype=torch.int32),
'id': '2',
'length': 6}
定义collate_fn函数,传入Dataloader类
自定义collate_fn代码
from torch.nn.utils.rnn import pad_sequence
def collate_func(batch_dic):
batch_len=len(batch_dic) # 批尺寸
max_seq_length=max([dic['length'] for dic in batch_dic]) # 一批数据中最长的那个样本长度
mask_batch=torch.zeros((batch_len,max_seq_length)) # mask
fea_batch=[]
label_batch=[]
id_batch=[]
for i in range(len(batch_dic)): # 分别提取批样本中的feature、label、id、length信息
dic=batch_dic[i]
fea_batch.append(dic['feature'])
label_batch.append(dic['label'])
id_batch.append(dic['id'])
mask_batch[i,:dic['length']]=1 # mask
res={}
res['feature']=pad_sequence(fea_batch,batch_first=True) # 将信息封装在字典res中
res['label']=pad_sequence(label_batch,batch_first=True)
res['id']=id_batch
res['mask']=mask_batch
return res
pytorch中的dataloader返回的是一个list,也即collate_func的输入是一个列表。
说明: mask 字段用以存储变长序列的实际长度,补零的部分记为0,实际序列对应位置记为1。返回数据的格式及包含的字段,根据自己的需求进行定义。
这一段似乎用映射map更合适:
for i in range(len(batch_dic)):
dic=batch_dic[i]
fea_batch.append(dic['feature'])
label_batch.append(dic['label'])
id_batch.append(dic['id'])
mask_batch[i,:dic['length']]=1
fea_batch = list(map(lambda x: x['feature'], batch_dic))
label_batch = list(map(lambda x: x['label'], batch_dic))
id_batch = list(map(lambda x: x['id'], batch_dic))
Dataloader实例化调用代码:
train_loader = DataLoader(dataset, batch_size=3, num_workers=1, shuffle=True,collate_fn=collate_func)
完整流程代码
import os
import numpy as np
import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from tqdm import tqdm
class time_series_dataset(Dataset):
def __init__(self, data_root):
"""
:param data_root: 数据集路径
"""
self.data_root = data_root
file_list = os.listdir(data_root)
file_prefix = []
for file in file_list:
if '.json' in file:
file_prefix.append(file.split('.')[0])
file_prefix = list(set(file_prefix))
self.data = file_prefix
def __len__(self):
return len(self.data)
def __getitem__(self, index):
prefix = self.data[index]
import json
with open(self.data_root+prefix+'.json','r',encoding='utf-8') as f:
data_dic=json.load(f)
feature = np.array(data_dic['feature'])
length=len(data_dic['feature'])
feature = torch.from_numpy(feature)
label = np.array(data_dic['label'])
label = torch.from_numpy(label)
sample = {'feature': feature, 'label': label, 'id': prefix,'length':length}
return sample
def collate_func(batch_dic):
#from torch.nn.utils.rnn import pad_sequence
batch_len=len(batch_dic)
max_seq_length=max([dic['length'] for dic in batch_dic])
mask_batch=torch.zeros((batch_len,max_seq_length))
fea_batch=[]
label_batch=[]
id_batch=[]
for i in range(len(batch_dic)):
dic=batch_dic[i]
fea_batch.append(dic['feature'])
label_batch.append(dic['label'])
id_batch.append(dic['id'])
mask_batch[i,:dic['length']]=1
res={}
res['feature']=pad_sequence(fea_batch,batch_first=True)
res['label']=pad_sequence(label_batch,batch_first=True)
res['id']=id_batch
res['mask']=mask_batch
return res
if __name__ == "__main__":
dataset = time_series_dataset("./data/")
batch_size=3
train_loader = DataLoader(dataset, batch_size=batch_size, num_workers=4, shuffle=True,collate_fn=collate_func)
for batch_idx, batch in tqdm(enumerate(train_loader),total=int(len(train_loader.dataset) / batch_size) + 1):
inputs,labels,masks,ids=batch['feature'],batch['label'],batch['mask'],batch['id']
break
示例2
from torch.nn.utils.rnn import pack_sequence
from torch.utils.data import DataLoader
def my_collate(batch):
# batch contains a list of tuples of structure (sequence, target)
data = [item[0] for item in batch]
data = pack_sequence(data, enforce_sorted=False)
targets = [item[1] for item in batch]
return [data, targets]
# ...
# later in you code, when you define you DataLoader - use the custom collate function
loader = DataLoader(dataset,
batch_size,
shuffle,
collate_fn=my_collate, # use custom collate function here
pin_memory=True)
示例3
沿一般的维度填充
I wrote a simple code that maybe someone here can re-use. I wanted to make something that pads a generic dim, and I don’t use an RNN of any type so PackedSequence was a bit of overkill for me. It’s simple, but it works for me.
def pad_tensor(vec, pad, dim):
"""
args:
vec - tensor to pad
pad - the size to pad to
dim - dimension to pad
return:
a new tensor padded to 'pad' in dimension 'dim'
"""
pad_size = list(vec.shape)
pad_size[dim] = pad - vec.size(dim)
return torch.cat([vec, torch.zeros(*pad_size)], dim=dim)
class PadCollate:
"""
a variant of callate_fn that pads according to the longest sequence in
a batch of sequences
"""
def __init__(self, dim=0):
"""
args:
dim - the dimension to be padded (dimension of time in sequences)
"""
self.dim = dim
def pad_collate(self, batch):
"""
args:
batch - list of (tensor, label)
reutrn:
xs - a tensor of all examples in 'batch' after padding
ys - a LongTensor of all labels in batch
"""
# find longest sequence
max_len = max(map(lambda x: x[0].shape[self.dim], batch))
# pad according to max_len
batch = map(lambda (x, y):
(pad_tensor(x, pad=max_len, dim=self.dim), y), batch)
# stack all
xs = torch.stack(map(lambda x: x[0], batch), dim=0)
ys = torch.LongTensor(map(lambda x: x[1], batch))
return xs, ys
def __call__(self, batch):
return self.pad_collate(batch)
to be used with the data loader:
train_loader = DataLoader(ds, ..., collate_fn=PadCollate(dim=0))
If you are going to pack your padded sequences later, you can also immediately sort the batches from longest sequence to shortest:
如果你打算后续对padded的样本进行pack操作,你可以对批样本从长到短进行排序:(这种做法是比较实用的,因为通常后续需要进行pack操作)
def sort_batch(batch, targets, lengths):
"""
Sort a minibatch by the length of the sequences with the longest sequences first
return the sorted batch targes and sequence lengths.
This way the output can be used by pack_padded_sequences(...)
"""
seq_lengths, perm_idx = lengths.sort(0, descending=True)
seq_tensor = batch[perm_idx]
target_tensor = targets[perm_idx]
return seq_tensor, target_tensor, seq_lengths
def pad_and_sort_batch(DataLoaderBatch):
"""
DataLoaderBatch should be a list of (sequence, target, length) tuples...
Returns a padded tensor of sequences sorted from longest to shortest,
"""
batch_size = len(DataLoaderBatch)
batch_split = list(zip(*DataLoaderBatch))
seqs, targs, lengths = batch_split[0], batch_split[1], batch_split[2]
max_length = max(lengths)
padded_seqs = np.zeros((batch_size, max_length))
for i, l in enumerate(lengths):
padded_seqs[i, 0:l] = seqs[i][0:l]
return sort_batch(torch.tensor(padded_seqs), torch.tensor(targs).view(-1,1), torch.tensor(lengths))
假设你的Dataset具有以下形式:
def __getitem__(self, idx):
return self.sequences[idx], torch.tensor(self.targets[idx]), self.sequence_lengths[idx]
使用时将pad_and_sort collator传到 DataLoader:
train_gen = Data.DataLoader(train_data, batch_size=128, shuffle=True, collate_fn=pad_and_sort_batch)
示例5
def collate_fn_padd(batch):
'''
Padds batch of variable length
note: it converts things ToTensor manually here since the ToTensor transform
assume it takes in images rather than arbitrary tensors.
'''
## get sequence lengths
lengths = torch.tensor([ t.shape[0] for t in batch ]).to(device)
## padd
batch = [ torch.Tensor(t).to(device) for t in batch ]
batch = torch.nn.utils.rnn.pad_sequence(batch)
## compute mask
mask = (batch != 0).to(device)
return batch, lengths, mask
参考:
https://blog.csdn.net/lrs1353281004/article/details/106129660
https://discuss.pytorch.org/t/dataloader-for-various-length-of-data/6418