欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Pytorch中的dataloader以及处理变长数据

程序员文章站 2024-03-25 08:00:57
...

起初,我最开始单独训练一个网络来完成landmark点回归任务和分类任务,训练的数据是txt格式,在训练之前对数据进行分析,发现分类任务中存在严重的数据样本不均衡的问题,那么我事先针对性的进行数据采样均衡操作,重新得到训练和测试的txt数据和标签,保证了整个训练和测试数据的样本均衡性。由于我的整个项目是检测+点回归+分类,起初检测和点回归+分类是分两步实现的,检测是通过读取XML格式来进行训练,现在要统一整个项目的训练和测试过程,要将点回归+分类的训练测试过程也按照读取XML格式来进行,那么就遇到一个问题,如何针对性的去给样本偏少的样本进行均衡,由于在dataset类中,返回的图像和标签都是针对每个index返回一个结果,在dataset类中进行操作似乎不太可行,那么就想到在dataloader中进行操作,通过dataloader中的参数sample来完成针对性采样。

还有一个问题是关于num_workers的设置,因为我有对比过,在我的单机RTX 2080Ti上和八卡服务器TITAN RTX上(仅使用单卡,其它卡有在跑其它任务),使用相同的num_workers,在单机上的训练速度反而更快,于是猜想可能和CPU或者内存有关系,下面会具体分析。

首先来看下下dataloader中的各个参数的含义。

类的定义为:torch.utils.data.DataLoader ,其中包含的参数有:

torch.utils.data.DataLoader(dataset, batch_size=1, shuffle=False, sampler=None, \
    batch_sampler=None, num_workers=0, collate_fn=None, pin_memory=False, \
    drop_last=False, timeout=0, worker_init_fn=None, multiprocessing_context=None)

dataset:定义的dataset类返回的结果。

batchsize:每个bacth要加载的样本数,默认为1。

shuffle:在每个epoch中对整个数据集data进行shuffle重排,默认为False。

sample:定义从数据集中加载数据所采用的策略,如果指定的话,shuffle必须为False;batch_sample类似,表示一次返回一个batch的index。

num_workers:表示开启多少个线程数去加载你的数据,默认为0,代表只使用主进程。

collate_fn:表示合并样本列表以形成小批量的Tensor对象。

pin_memory:表示要将load进来的数据是否要拷贝到pin_memory区中,其表示生成的Tensor数据是属于内存中的锁页内存区,这样将Tensor数据转义到GPU中速度就会快一些,默认为False。

drop_last:当你的整个数据长度不能够整除你的batchsize,选择是否要丢弃最后一个不完整的batch,默认为False。

注:这里简单科普下pin_memory,通常情况下,数据在内存中要么以锁页的方式存在,要么保存在虚拟内存(磁盘)中,设置为True后,数据直接保存在锁页内存中,后续直接传入cuda;否则需要先从虚拟内存中传入锁页内存中,再传入cuda,这样就比较耗时了,但是对于内存的大小要求比较高。

下面针对num_workers,sample和collate_fn分别进行说明:

1. 设置num_workers:

pytorch中dataloader一次性创建num_workers个子线程,然后用batch_sampler将指定batch分配给指定worker,worker将它负责的batch加载进RAM,dataloader就可以直接从RAM中找本轮迭代要用的batch。如果num_worker设置得大,好处是寻batch速度快,因为下一轮迭代的batch很可能在上一轮/上上一轮...迭代时已经加载好了。坏处是内存开销大,也加重了CPU负担(worker加载数据到RAM的进程是进行CPU复制)。如果num_worker设为0,意味着每一轮迭代时,dataloader不再有自主加载数据到RAM这一步骤,只有当你需要的时候再加载相应的batch,当然速度就更慢。num_workers的经验设置值是自己电脑/服务器的CPU核心数,如果CPU很强、RAM也很充足,就可以设置得更大些,对于单机来说,单跑一个任务的话,直接设置为CPU的核心数最好。

2. 定义sample:(假设dataset类返回的是:data, label)

from torch.utils.data.sampler import WeightedRandomSampler
## 如果label为1,那么对应的该类别被取出来的概率是另外一个类别的2倍
weights = [2 if label == 1 else 1 for data, label in dataset]
sampler = WeightedRandomSampler(weights,num_samples=10, replacement=True)
dataloader = DataLoader(dataset, batch_size=16, sampler=sampler)

PyTorch中提供的这个sampler模块,用来对数据进行采样。默认采用SequentialSampler,它会按顺序一个一个进行采样。常用的有随机采样器:RandomSampler,当dataloader的shuffle参数为True时,系统会自动调用这个采样器,实现打乱数据。这里使用另外一个很有用的采样方法: WeightedRandomSampler,它会根据每个样本的权重选取数据,在样本比例不均衡的问题中,可用它来进行重采样。replacement用于指定是否可以重复选取某一个样本,默认为True,即允许在一个epoch中重复采样某一个数据。

3. 定义collate_fn:

def detection_collate(batch):
    """Custom collate fn for dealing with batches of images that have a different
    number of associated object annotations (bounding boxes).

    Arguments:
        batch: (tuple) A tuple of tensor images and lists of annotations

    Return:
        A tuple containing:
            1) (tensor) batch of images stacked on their 0 dim
            2) (list of tensors) annotations for a given image are stacked on
                                 0 dim
    """
    targets = []
    imgs = []
    for sample in batch:
        imgs.append(sample[0])
        targets.append(torch.FloatTensor(sample[1]))
    return torch.stack(imgs, 0), targets

使用dataloader时加入collate_fn参数,即可合并样本列表以形成小批量的Tensor对象,如果你的标签不止一个的话,还可以支持自定义,在上述方法中再额外添加对应的label即可。

data_loader = torch.utils.data.DataLoader(dataset, args.batch_size,
    num_workers=args.num_workers, sampler=sampler, shuffle=False, 
    collate_fn=detection_collate, pin_memory=True, drop_last=True)

 

现在的问题:有的时候,特别对于NLP任务来说,输入的数据可能不是定长的,比如多个句子的长度一般不会一致,这时候使用DataLoader加载数据时,不定长的句子会被胡乱切分,这肯定是不行的。

解决方法是重写DataLoader的collate_fn,具体方法如下:

# 假如每一个样本为:
sample = {
    # 一个句子中各个词的id
    'token_list' : [5, 2, 4, 1, 9, 8],
    # 结果y
    'label' : 5,
}
 
 
# 重写collate_fn函数,其输入为一个batch的sample数据
def collate_fn(batch):
    # 因为token_list是一个变长的数据,所以需要用一个list来装这个batch的token_list
  token_lists = [item['token_list'] for item in batch]
   
  # 每个label是一个int,我们把这个batch中的label也全取出来,重新组装
  labels = [item['label'] for item in batch]
  # 把labels转换成Tensor
  labels = torch.Tensor(labels)
  return {
    'token_list': token_lists,
    'label': labels,
  }
 
 
# 在使用DataLoader加载数据时,注意collate_fn参数传入的是重写的函数
DataLoader(trainset, batch_size=4, shuffle=True, num_workers=4, collate_fn=collate_fn)

Pytorch加载变长度序列数据

在处理序列数据集时,有时会遇到变长度的样本。此时因为尺寸不一致,无法直接利用pytorch中dataloader的默认加载方式(沿着批维度直接Stack)。

处理这种数据集,一种办法是可以事先记录每个样本的长度,并把所有的数据集样本补全至最长的样本长度,这样所有样本长度一致,可以直接加载。但是会有一个问题,就是例如在使用RNN建模时,这些padding的0值会对模型造成额外影响.参考这篇文章

pytorch中通过函数torch.nn.utils.rnn.pack_padded_sequence()以及torch.nn.utils.rnn.pad_packed_sequence()来解决这个问题。torch.nn.utils.rnn.pack_padded_sequence()通过利用

pad之后的样本和每个原始序列的长度对补全后的样本进行pack。这样RNN模型在计算时,根据原来的样本长度就知道每个样本在何时结束,从而避免额外的pad的0值的影响。计算完之后通过torch.nn.utils.rnn.pad_packed_sequence()将输出的格式转换为pack之前的格式。

collate_fn

另一种办法是通过自定义collate_fn,并将其传入DataLoader,从而实现自定义的批数据聚合方式。这里给出一些示例。

这篇文章给出了一种解决思路

示例1

问题背景

想要使用pytorch 框架中的 Dataset 和 Dataloader 类,将变长序列整合为batch数据 (主要是对长短不一的序列进行补齐),通过自定义collate_fn函数,实现对变长数据的处理。

主要思路

Dataset 主要负责读取单条数据,建立索引方式。
Dataloader 负责将数据聚合为batch。
 

测试环境: python 3.6 ,pytorch 1.2.0

数据路径:

Pytorch中的dataloader以及处理变长数据

 data路径下存储的是待存储的数据样本。
举例:其中的 1.json 样本格式为:

Pytorch中的dataloader以及处理变长数据

定义数据集class,进行数据索引

数据集class定义代码:

import os
import numpy as np
import torch
from torch.utils.data import Dataset
from tqdm import tqdm
class time_series_dataset(Dataset):
    def __init__(self, data_root):
        """
        :param data_root:   数据集路径
        """
        self.data_root = data_root
        file_list = os.listdir(data_root)
        file_prefix = []
        for file in file_list:
            if '.json' in file:
                file_prefix.append(file.split('.')[0])
        file_prefix = list(set(file_prefix))
        self.data = file_prefix
    def __len__(self):
        return len(self.data)
    def __getitem__(self, index):
        prefix = self.data[index]
        import json
        with open(self.data_root+prefix+'.json','r',encoding='utf-8') as f:
            data_dic=json.load(f)
        feature = np.array(data_dic['feature'])
        length=len(data_dic['feature'])
        feature = torch.from_numpy(feature)
        label = np.array(data_dic['label'])
        label = torch.from_numpy(label)
        sample = {'feature': feature, 'label': label, 'id': prefix,'length':length}
        return sample

这里dataset将每个样本的数据,标签、以及每个样本的长度都包裹在一个字典里并返回。

数据集实例化:

dataset = time_series_dataset("./data/") # "./data/" 为数据集文件存储路径

基于此数据集的实际数据格式如下:
举例: dataset[0]

 {'feature': tensor([17, 14, 16, 18, 14, 16], dtype=torch.int32),
  'label': tensor([0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0,
          0], dtype=torch.int32),
  'id': '2',
  'length': 6}

定义collate_fn函数,传入Dataloader类

自定义collate_fn代码

from torch.nn.utils.rnn import pad_sequence

def collate_func(batch_dic):
    batch_len=len(batch_dic) # 批尺寸
    max_seq_length=max([dic['length'] for dic in batch_dic]) # 一批数据中最长的那个样本长度
    mask_batch=torch.zeros((batch_len,max_seq_length)) # mask
    fea_batch=[]
    label_batch=[]
    id_batch=[]
    for i in range(len(batch_dic)): # 分别提取批样本中的feature、label、id、length信息
        dic=batch_dic[i]
        fea_batch.append(dic['feature'])
        label_batch.append(dic['label'])
        id_batch.append(dic['id'])
        mask_batch[i,:dic['length']]=1 # mask
    res={}
    res['feature']=pad_sequence(fea_batch,batch_first=True) # 将信息封装在字典res中
    res['label']=pad_sequence(label_batch,batch_first=True)
    res['id']=id_batch
    res['mask']=mask_batch
    return res

pytorch中的dataloader返回的是一个list,也即collate_func的输入是一个列表。

说明: mask 字段用以存储变长序列的实际长度,补零的部分记为0,实际序列对应位置记为1。返回数据的格式及包含的字段,根据自己的需求进行定义。

这一段似乎用映射map更合适:

for i in range(len(batch_dic)):
        dic=batch_dic[i]
        fea_batch.append(dic['feature'])
        label_batch.append(dic['label'])
        id_batch.append(dic['id'])
        mask_batch[i,:dic['length']]=1
     fea_batch = list(map(lambda x: x['feature'], batch_dic))
     label_batch = list(map(lambda x: x['label'], batch_dic))
     id_batch = list(map(lambda x: x['id'], batch_dic))

Dataloader实例化调用代码:

 train_loader = DataLoader(dataset, batch_size=3, num_workers=1, shuffle=True,collate_fn=collate_func)

完整流程代码

import os
import numpy as np
import torch
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from tqdm import tqdm
class time_series_dataset(Dataset):
    def __init__(self, data_root):
        """
        :param data_root:   数据集路径
        """
        self.data_root = data_root
        file_list = os.listdir(data_root)
        file_prefix = []
        for file in file_list:
            if '.json' in file:
                file_prefix.append(file.split('.')[0])
        file_prefix = list(set(file_prefix))
        self.data = file_prefix
    def __len__(self):
        return len(self.data)
    def __getitem__(self, index):
        prefix = self.data[index]
        import json
        with open(self.data_root+prefix+'.json','r',encoding='utf-8') as f:
            data_dic=json.load(f)
        feature = np.array(data_dic['feature'])
        length=len(data_dic['feature'])
        feature = torch.from_numpy(feature)
        label = np.array(data_dic['label'])
        label = torch.from_numpy(label)
        sample = {'feature': feature, 'label': label, 'id': prefix,'length':length}
        return sample
def collate_func(batch_dic):
    #from torch.nn.utils.rnn import pad_sequence
    batch_len=len(batch_dic)
    max_seq_length=max([dic['length'] for dic in batch_dic])
    mask_batch=torch.zeros((batch_len,max_seq_length))
    fea_batch=[]
    label_batch=[]
    id_batch=[]
    for i in range(len(batch_dic)):
        dic=batch_dic[i]
        fea_batch.append(dic['feature'])
        label_batch.append(dic['label'])
        id_batch.append(dic['id'])
        mask_batch[i,:dic['length']]=1
    res={}
    res['feature']=pad_sequence(fea_batch,batch_first=True)
    res['label']=pad_sequence(label_batch,batch_first=True)
    res['id']=id_batch
    res['mask']=mask_batch
    return res
if __name__ == "__main__":
    dataset = time_series_dataset("./data/")
    batch_size=3
    train_loader = DataLoader(dataset, batch_size=batch_size, num_workers=4, shuffle=True,collate_fn=collate_func)
    for batch_idx, batch in tqdm(enumerate(train_loader),total=int(len(train_loader.dataset) / batch_size) + 1):
        inputs,labels,masks,ids=batch['feature'],batch['label'],batch['mask'],batch['id']
        break

示例2 

from torch.nn.utils.rnn import pack_sequence
from torch.utils.data import DataLoader

def my_collate(batch):
    # batch contains a list of tuples of structure (sequence, target)
    data = [item[0] for item in batch]
    data = pack_sequence(data, enforce_sorted=False)
    targets = [item[1] for item in batch]
    return [data, targets]

# ...
# later in you code, when you define you DataLoader - use the custom collate function
loader = DataLoader(dataset,
                      batch_size,
                      shuffle,
                      collate_fn=my_collate, # use custom collate function here
                      pin_memory=True)

示例3

沿一般的维度填充

 I wrote a simple code that maybe someone here can re-use. I wanted to make something that pads a generic dim, and I don’t use an RNN of any type so PackedSequence was a bit of overkill for me. It’s simple, but it works for me.

def pad_tensor(vec, pad, dim):
    """
    args:
        vec - tensor to pad
        pad - the size to pad to
        dim - dimension to pad

    return:
        a new tensor padded to 'pad' in dimension 'dim'
    """
    pad_size = list(vec.shape)
    pad_size[dim] = pad - vec.size(dim)
    return torch.cat([vec, torch.zeros(*pad_size)], dim=dim)


class PadCollate:
    """
    a variant of callate_fn that pads according to the longest sequence in
    a batch of sequences
    """

    def __init__(self, dim=0):
        """
        args:
            dim - the dimension to be padded (dimension of time in sequences)
        """
        self.dim = dim

    def pad_collate(self, batch):
        """
        args:
            batch - list of (tensor, label)

        reutrn:
            xs - a tensor of all examples in 'batch' after padding
            ys - a LongTensor of all labels in batch
        """
        # find longest sequence
        max_len = max(map(lambda x: x[0].shape[self.dim], batch))
        # pad according to max_len
        batch = map(lambda (x, y):
                    (pad_tensor(x, pad=max_len, dim=self.dim), y), batch)
        # stack all
        xs = torch.stack(map(lambda x: x[0], batch), dim=0)
        ys = torch.LongTensor(map(lambda x: x[1], batch))
        return xs, ys

    def __call__(self, batch):
        return self.pad_collate(batch)

 to be used with the data loader: 

train_loader = DataLoader(ds, ..., collate_fn=PadCollate(dim=0))

If you are going to pack your padded sequences later, you can also immediately sort the batches from longest sequence to shortest:

如果你打算后续对padded的样本进行pack操作,你可以对批样本从长到短进行排序:(这种做法是比较实用的,因为通常后续需要进行pack操作)

def sort_batch(batch, targets, lengths):
    """
    Sort a minibatch by the length of the sequences with the longest sequences first
    return the sorted batch targes and sequence lengths.
    This way the output can be used by pack_padded_sequences(...)
    """
    seq_lengths, perm_idx = lengths.sort(0, descending=True)
    seq_tensor = batch[perm_idx]
    target_tensor = targets[perm_idx]
    return seq_tensor, target_tensor, seq_lengths

def pad_and_sort_batch(DataLoaderBatch):
    """
    DataLoaderBatch should be a list of (sequence, target, length) tuples...
    Returns a padded tensor of sequences sorted from longest to shortest,
    """
    batch_size = len(DataLoaderBatch)
    batch_split = list(zip(*DataLoaderBatch))

    seqs, targs, lengths = batch_split[0], batch_split[1], batch_split[2]
    max_length = max(lengths)

    padded_seqs = np.zeros((batch_size, max_length))
    for i, l in enumerate(lengths):
        padded_seqs[i, 0:l] = seqs[i][0:l]

    return sort_batch(torch.tensor(padded_seqs), torch.tensor(targs).view(-1,1), torch.tensor(lengths))

假设你的Dataset具有以下形式:

def __getitem__(self, idx):
         return self.sequences[idx], torch.tensor(self.targets[idx]), self.sequence_lengths[idx]

使用时将pad_and_sort collator传到 DataLoader:

train_gen = Data.DataLoader(train_data, batch_size=128, shuffle=True, collate_fn=pad_and_sort_batch)

示例5

def collate_fn_padd(batch):
    '''
    Padds batch of variable length

    note: it converts things ToTensor manually here since the ToTensor transform
    assume it takes in images rather than arbitrary tensors.
    '''
    ## get sequence lengths
    lengths = torch.tensor([ t.shape[0] for t in batch ]).to(device)
    ## padd
    batch = [ torch.Tensor(t).to(device) for t in batch ]
    batch = torch.nn.utils.rnn.pad_sequence(batch)
    ## compute mask
    mask = (batch != 0).to(device)
    return batch, lengths, mask

参考:

https://blog.csdn.net/lrs1353281004/article/details/106129660

https://discuss.pytorch.org/t/dataloader-for-various-length-of-data/6418