import numpy as np
import torch
from torch import nn
import torch.nn.functional as F


# open text file and read in data as `text`
with open('data/anna.txt', 'r') as f:
    text = f.read()


# 1. int2char, which maps integers to characters
# 2. char2int, which maps characters to unique integers
chars = tuple(set(text))
int2char = dict(enumerate(chars))
char2int = {ch: ii for ii, ch in int2char.items()}

# encode the text
encoded = np.array([char2int[ch] for ch in text])

预处理数据:LSTM 层级要求输入是独热编码。

def one_hot_encode(arr, n_labels):
    # Initialize the the encoded array
    one_hot = np.zeros((np.multiply(*arr.shape), n_labels), dtype=np.float32)
    # Fill the appropriate elements with ones

    one_hot[np.arange(one_hot.shape[0]), arr.flatten()] = 1.
    # Finally reshape it to get back to the original array

    one_hot = one_hot.reshape((*arr.shape, n_labels))
    return one_hot
# check that the function works as expected
test_seq = np.array([[3, 5, 1]])
one_hot = one_hot_encode(test_seq, 8)


创建训练迷你批次:要使用此数据训练模型,需要创建迷你批次。希望批次是多个序列,由一定数量的序列步组成。在示例中,将获取编码字符(作为 arr 参数传入),并根据 batch_size 将它们拆分为多个序列。每个序列长为 seq_length

def get_batches(arr, batch_size, seq_length):
    '''Create a generator that returns batches of size
       batch_size x seq_length from arr.
       arr: Array you want to make batches from
       batch_size: Batch size, the number of sequences per batch
       seq_length: Number of encoded chars in a sequence
    ## TODO: Get the number of batches we can make
    batch_size_total = batch_size * seq_length
    n_batches = len(arr)//batch_size_total
    ## TODO: Keep only enough characters to make full batches
    arr = arr[:batch_size_total * n_batches]
    ## TODO: Reshape into batch_size rows
    arr = arr.reshape((batch_size,-1))
    ## TODO: Iterate over the batches using a window of size seq_length
    for n in range(0, arr.shape[1], seq_length):
        # The features
        x = arr[:,n:n+seq_length]
        # The targets, shifted by one
        y = np.zeros_like(x)
            y[:,:-1],y[:,-1] = x[:,1:],arr[:,n+seq_length]
        except IndexError:
            y[:,:-1],y[:,-1] = x[:,1:],arr[:,0]
        yield x, y



batches = get_batches(encoded, 8, 50)
x, y = next(batches)
# printing out the first 10 items in a sequence
print('x\n', x[:10, :10])
print('\ny\n', y[:10, :10])


# check if GPU is available
train_on_gpu = torch.cuda.is_available()
    print('Training on GPU!')
    print('No GPU available, training on CPU; consider making n_epochs very small.')


class CharRNN(nn.Module):
    def __init__(self, tokens, n_hidden=256, n_layers=2,
                               drop_prob=0.5, lr=0.001):
        self.drop_prob = drop_prob
        self.n_layers = n_layers
        self.n_hidden = n_hidden
        self.lr = lr
        # creating character dictionaries
        self.chars = tokens
        self.int2char = dict(enumerate(self.chars))
        self.char2int = {ch: ii for ii, ch in self.int2char.items()}
        ## TODO: define the layers of the model
        self.lstm = nn.LSTM(len(self.chars),n_hidden,n_layers,dropout=drop_prob,batch_first=True)
        self.dropout = nn.Dropout(drop_prob)
        self.fc = nn.Linear(n_hidden,len(self.chars))
    def forward(self, x, hidden):
        ''' Forward pass through the network. 
            These inputs are x, and the hidden/cell state `hidden`. '''
        ## TODO: Get the outputs and the new hidden state from the lstm
        r_output,hidden =self.lstm(x,hidden)
        out = self.dropout(r_output)
        out = out.contiguous().view(-1,self.n_hidden)
        out = self.fc(out)
        # return the final output and the hidden state
        return out, hidden
    def init_hidden(self, batch_size):
        ''' Initializes hidden state '''
        # Create two new tensors with sizes n_layers x batch_size x n_hidden,
        # initialized to zero, for hidden state and cell state of LSTM
        weight = next(self.parameters()).data
        if (train_on_gpu):
            hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda(),
                  weight.new(self.n_layers, batch_size, self.n_hidden).zero_().cuda())
            hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_(),
                      weight.new(self.n_layers, batch_size, self.n_hidden).zero_())
        return hidden

训练网络:在 train() 函数中设定周期数、学习速率和其他参数。在下面使用 Adam 优化器和交叉熵损失,因为输出是字符类别分数。照常计算损失并执行反向传播步骤。关于训练的几个细节信息:

  • 在训练循环中将隐藏状态与其历史记录分离开;这次将其设为新的元组变量,因为 LSTM 有一个隐藏状态,该隐藏状态是由隐藏状态和单元状态组成的元组。
  • 使用clip_grad_norm_ 防止梯度爆炸。
def train(net, data, epochs=1, batch_size=10, seq_length=50, lr=0.001, clip=5, val_frac=0.1, print_every=10):
    ''' Training a network 
        net: CharRNN network
        data: text data to train the network
        epochs: Number of epochs to train
        batch_size: Number of mini-sequences per mini-batch, aka batch size
        seq_length: Number of character steps per mini-batch
        lr: learning rate
        clip: gradient clipping
        val_frac: Fraction of data to hold out for validation
        print_every: Number of steps for printing training and validation loss
    opt = torch.optim.Adam(net.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()
    # create training and validation data
    val_idx = int(len(data)*(1-val_frac))
    data, val_data = data[:val_idx], data[val_idx:]
    counter = 0
    n_chars = len(net.chars)
    for e in range(epochs):
        # initialize hidden state
        h = net.init_hidden(batch_size)
        for x, y in get_batches(data, batch_size, seq_length):
            counter += 1
            # One-hot encode our data and make them Torch tensors
            x = one_hot_encode(x, n_chars)
            inputs, targets = torch.from_numpy(x), torch.from_numpy(y)
                inputs, targets = inputs.cuda(), targets.cuda()

            # Creating new variables for the hidden state, otherwise
            # we'd backprop through the entire training history
            h = tuple([each.data for each in h])

            # zero accumulated gradients
            # get the output from the model
            output, h = net(inputs, h)
            # calculate the loss and perform backprop
            loss = criterion(output, targets.view(batch_size*seq_length).long())
            # `clip_grad_norm` helps prevent the exploding gradient problem in RNNs / LSTMs.
            nn.utils.clip_grad_norm_(net.parameters(), clip)
            # loss stats
            if counter % print_every == 0:
                # Get validation loss
                val_h = net.init_hidden(batch_size)
                val_losses = []
                for x, y in get_batches(val_data, batch_size, seq_length):
                    # One-hot encode our data and make them Torch tensors
                    x = one_hot_encode(x, n_chars)
                    x, y = torch.from_numpy(x), torch.from_numpy(y)
                    # Creating new variables for the hidden state, otherwise
                    # we'd backprop through the entire training history
                    val_h = tuple([each.data for each in val_h])
                    inputs, targets = x, y
                        inputs, targets = inputs.cuda(), targets.cuda()

                    output, val_h = net(inputs, val_h)
                    val_loss = criterion(output, targets.view(batch_size*seq_length).long())
                net.train() # reset to train mode after iterationg through validation data
                print("Epoch: {}/{}...".format(e+1, epochs),
                      "Step: {}...".format(counter),
                      "Loss: {:.4f}...".format(loss.item()),
                      "Val Loss: {:.4f}".format(np.mean(val_losses)))


## TODO: set you model hyperparameters
# define and print the net
n_hidden= 256
n_layers= 2

net = CharRNN(chars, n_hidden, n_layers)


batch_size = 32
seq_length = 50
n_epochs = 1 # start small if you are just testing initial behavior

# train the model
train(net, encoded, epochs=n_epochs, batch_size=batch_size, seq_length=seq_length, lr=0.001, print_every=10)


# change the name, for saving multiple files
model_name = 'rnn_x_epoch.net'

checkpoint = {'n_hidden': net.n_hidden,
              'n_layers': net.n_layers,
              'state_dict': net.state_dict(),
              'tokens': net.chars}

with open(model_name, 'wb') as f:
    torch.save(checkpoint, f)

Top-K抽样:预测来自所有潜在字符的类别概率分布。可以抽样文本并仅考虑前 ????K 个潜在字符,使抽样文本更合理(变量更少)。这样可以避免网络提供完全不合理的字符,并且能够向抽样文本里引入一些噪点和随机性。

def predict(net, char, h=None, top_k=None):
        ''' Given a character, predict the next character.
            Returns the predicted character and the hidden state.
        # tensor inputs
        x = np.array([[net.char2int[char]]])
        x = one_hot_encode(x, len(net.chars))
        inputs = torch.from_numpy(x)
            inputs = inputs.cuda()
        # detach hidden state from history
        h = tuple([each.data for each in h])
        # get the output of the model
        out, h = net(inputs, h)

        # get the character probabilities
        p = F.softmax(out, dim=1).data
            p = p.cpu() # move to cpu
        # get top characters
        if top_k is None:
            top_ch = np.arange(len(net.chars))
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.numpy().squeeze()
        # select the likely next character with some element of randomness
        p = p.numpy().squeeze()
        char = np.random.choice(top_ch, p=p/p.sum())
        # return the encoded value of the predicted char and the hidden state
        return net.int2char[char], h

设定prime单词生成文本:通常,需要设定 prime 单词来构建隐藏状态。否则,网络将开始随机生成字符。前几个字符一般比较难预测,因为预测上下文信息不足。

def sample(net, size, prime='The', top_k=None):
    net.eval() # eval mode
    # First off, run through the prime characters
    chars = [ch for ch in prime]
    h = net.init_hidden(1)
    for ch in prime:
        char, h = predict(net, ch, h, top_k=top_k)

    # Now pass in the previous character and get a new one
    for ii in range(size):
        char, h = predict(net, chars[-1], h, top_k=top_k)

    return ''.join(chars)
print(sample(net, 1000, prime='Anna', top_k=5))


# Here we have loaded in a model that trained over 20 epochs `rnn_20_epoch.net`
with open('rnn_x_epoch.net', 'rb') as f:
    checkpoint = torch.load(f)
loaded = CharRNN(checkpoint['tokens'], n_hidden=checkpoint['n_hidden'], n_layers=checkpoint['n_layers'])
# Sample using a loaded model
print(sample(loaded, 2000, top_k=5, prime="And Levin said"))