欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

利用PyTorch实现循环神经网络RNN

程序员文章站 2022-04-09 16:22:05
目录数据集任务介绍代码详解:导入需要的模块:读取并处理数据:定义RNN类,继承自nn.Module定义MyRNN类,继承自nn.Module定义MyLSTM类,继承自nn.Module定义LSTM类,继承自nn.Module定义GRU类,继承自nn.Module定义损失函数 MAPE MSE MAE定义函数 next_batch声明与初始化变量,实例化对象训练与验证模型结果可视化MAPE对应Python语句:MSE对应Python语句:MAE对应Python语句:代码及数据集下载地址:循环神经网络的英文:...

循环神经网络的英文:Recurrent Neural Network,简称为RNN

数据集

  • 高速公路车流量数据。
  • PeMS是美国加利福尼亚州高速公路的实时车流量数据 (每5分钟采集一次)。
  • 数据由铺设在道路上的检测线圈采集。
  • 本实验中包含一个地区的数据,储存在PEMS04.npz文件中。
  • 原始数据使用numpy二进制文件存储,可以使用numpy.load函数读取。
  • 数据有三个特征维度:车流量、拥挤程度和车速。

文件名称:PEMS04
文件后缀:npz
文件大小:31.4 MB

任务介绍

用历史流量数据预测未来流量

  • 手动实现循环神经网络RNN,并在数据集上进行实验,从训练时间、预测精度、Loss变化等角度分析实验结果(最好使用图表展示)
  • 使用torch.nn.LSTM实现循环神经网络,并在数据集上进行实验,从训练时间、预测精度、Loss变化等角度分析实验结果(最好使用图表展示)
  • 不同超参数的对比分析(包括hidden_size、batchsize、lr等)选其中至少1-2个进行分析

代码详解:

导入需要的模块:

import os
import numpy as np
import pandas as pd
import torch
from torch import nn
from sklearn.utils import shuffle
import math
import time 
import matplotlib.pyplot as plt

读取并处理数据:

pems04_data = np.load("./data/PEMS04/PEMS04.npz")['data']

feature_dim = 0
window_size = 12

train_set = []
test_set = []
train_seqs = pems04_data[:int(pems04_data.shape[0] * 0.6)]
test_seqs = pems04_data[int(pems04_data.shape[0] * 0.6):]

for i in range(train_seqs.shape[0] - window_size):
	train_set.append(train_seqs[i:i + window_size, :, feature_dim])
	
for j in range(test_seqs.shape[0] - window_size):
	test_set.append(test_seqs[j:j + window_size, :, feature_dim])

train_set = np.concatenate(train_set, axis=1).transpose()
test_set = np.concatenate(test_set, axis=1).transpose()
input_size = 1

定义RNN类,继承自nn.Module

class RNN(nn.Module):
	def __init__(self, input_size, hidden_size, output_size):
		super(RNN, self).__init__()
		self.hidden_size = hidden_size
		self.rnn = nn.RNN(input_size, hidden_size, output_size, batch_first=True)
		self.linear = nn.Linear(hidden_size, output_size)
	
	def forward(self, x):
		batch_size = x.size(0)
		seq_len = s.size(1)
		h = torch.zeros(input_size, batch_size, self.hidden_size).to(x.device)
		out, hn = self.rnn(x, h)
		out = self.linear(out)
		return hn, out

定义MyRNN类,继承自nn.Module

class MyRNN(nn.Module):
	def __init__(self, input_size, hidden_size, output_size):
		super().__init__()
		self.hidden_size = hidden_size
	
		self.w_h = nn.Parameter(torch.rand(input_size, hidden_size))
		self.u_h = nn.Parameter(torch.rand(hidden_size, hidden_size))
		self.b_h = nn.Parameter(torch.zeros(hidden_size))

		self.w_y = nn.Parameter(torch.rand(hidden_size, output_size))
		self.b_y = nn.Parameter(torch.zeros(output_size))

		self.tanh = nn.Tanh()
		self.leaky_relu = nn.LeakyReLU()

		for param in self.parameters():
			if param.dim() > 1:
				nn.init.xavier_uniform_(param)

	def forward(self, x):
		batch_size = x.size(0)
		seq_len = x.size(1)

		h = torch.zeros(batch_size, self.hidden_size).to(x.device)		
		
		y_list = []
		for i in range(seq_len):
			h = self.tanh(torch.matmul(x[:, i, :], self.w_h) +
						  torch.matmul(h, self.u_h) + self.b_h)
			y = self.leaky_relu(torch.matmul(h, self.w_y) + self.b_y)
			y_list.append(y)
		
		return h, torch.stack(y_list, dim=1)

定义MyLSTM类,继承自nn.Module

class MyLSTM(nn.Module):
	def __init__(self, input_size, hidden_size):
		super().__init__()
        self.hidden_size = hidden_size

        self.gates = nn.Linear(input_size+hidden_size, hidden_size*4)
        self.sigmoid = nn.Sigmoid()
        self.tanh = nn.Tanh()

        for param in self.parameters():
            if param.dim()>1:
                nn.init.xavier_uniform_(param)

	def forward(self, x):
		batch_size = x.size(0)
        seq_len = x.size(1)

        h, c = (torch.zeros(batch_size, self.hidden_size).to(x.device) for _ in range(2))
        y_list = []

        for i in range(seq_len):
            forget_gate, input_gate, output_gate, candidata_cell = self.gates(torch.cat([x[:, i, :], h], dim=1)).chunk(4, -1)
            forget_gate, input_gate, output_gate = (self.sigmoid(g) for g in (forget_gate, input_gate, output_gate))
            c = forget_gate * c + input_gate * self.tanh(candidata_cell)
            h = output_gate * self.tanh(c)
            y_list.append(h)
        return torch.stack(y_list, dim=1), (h, c)

定义LSTM类,继承自nn.Module

class LSTM(nn.Module):
	def __init__(self, input_size, hidden_size, output_size):
		super().__init__()
		self.lstm = nn.LSTM(input_size, hidden_size, 2, batch_first=True)
		self.out = nn.Linear(hidden_size, output_size)

	def forward(self, x):
		x1, hn = self.lstm(x)
		a, b, c = x1.shape
		out = self.out(x1.contiguous().view(-1, c))
		out1 = out.view(a, b, -1)
		return out1

定义GRU类,继承自nn.Module

class GRU(nn.Module):
	def __init__(self, input_size, hidden_size, output_size):
		super().__init__()
		self.gru = nn.GRU(input_size, hidden_size, num_layers=2, batch_first=True)
		self.out = nn.Linear(hidden_size, output_size)
	
	def forward(self, x):
		x1, hn = self.gru(x)
		a, b, c = x1.shape
		out = self.out(x1.contiguous().view(-1, c))
		out1 = out.view(a, b, -1)
		return out1

定义损失函数 MAPE MSE MAE

def MAPE(y, y_pred):
	y, y_pred = np.array(y), np.array(y_pred)
	non_zero_index = (y > 0)
	y = y[non_zero_index]
	y_pred = y_pred[non_zero_index]

	mape = np.abs((y-y_pred)/y)
	mape[np.isinf(mape)] = 0
	return np.mean(mape) * 100


def MSE(target, output):
	return np.mean(np.power(target-output ,2))

def MAE(target, output):
	return np.mean(np.abs(target-output))

定义函数 next_batch

def next_batch(data):
	data_length = len(data)
	num_batches = math.ceil(data_length / batch_size)
	for batch_index in range(num_batches):
		start_index = batch_index * batch_size
		end_index = min((batch_index+1) * batch_size, data_length)
		yield data[start_index:end_index]

声明与初始化变量,实例化对象

epochs = 10
batch_size = 512
train_loss_log = []
test_loss_log = []
score_log = []
end_log = []

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# model = MyRNN(input_size=1, hidden_size=32, output_size=1).to(device)
# model = RNN(1, 32, 1).to(device)
# model = LSTM(input_size=1, hidden_size=32, output_size=1).to(device)
model = GRU(input_size=1, hidden_size=32, output_size=1).to(device)
loss_func = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0005)

训练与验证模型

for epoch in range(epochs):
    start = time.time()
    train_total_loss = 0
    train_batch_num = len(train_set)
    for batch in next_batch(shuffle(train_set)):
        batch = torch.from_numpy(batch).float().to(device)
        x, label = batch[:, :12], batch[:, -1]

        out = model(batch.unsqueeze(-1)) ###lstm
        # hidden, out = model(batch.unsqueeze(-1))
        prediction = out[:, -1, :].squeeze(-1)

        loss = loss_func(prediction, label)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        train_total_loss += loss.item()
    train_loss = train_total_loss / train_batch_num
    train_loss_log.append(train_loss)

    all_prediction = []
    test_total_loss = 0
    test_batch_num = len(test_set)
    for batch in next_batch(test_set):
        batch = torch.from_numpy(batch).float().to(device)
        x, label = batch[:, :12], batch[:, -1]
        out = model(batch.unsqueeze(-1)) ##lstm
        # hidden, out = model(batch.unsqueeze(-1))
        prediction = out[:, -1, :].squeeze(-1)
        loss = loss_func(prediction, label)
        test_total_loss += loss.item()
        all_prediction.append(prediction.detach().cpu().numpy())
    test_loss = test_total_loss/test_batch_num
    test_loss_log.append(test_loss)
    all_prediction = np.concatenate(all_prediction)
    all_label = test_set[:, -1]

    # 计算测试指标。
    rmse_score = math.sqrt(MSE(all_label, all_prediction))
    mae_score = MAE(all_label, all_prediction)
    mape_score = MAPE(all_label, all_prediction)
    score_log.append([rmse_score, mae_score, mape_score])
    end = time.time() - start
    end_log.append(end)
    print('epoch: %d, train_loss: %.4f, RMSE: %.4f, MAE: %.4f, MAPE: %.4f, time: %.1f, tiem/per epoch: %.4f' %
          (epoch + 1, train_loss, rmse_score, mae_score, mape_score, end, np.mean(end_log)))

结果可视化

x = np.linspace(0, len(train_loss_log), len(train_loss_log))
plt.plot(x, train_loss_log, label="train_loss")
plt.plot(x, test_loss_log, label='test_loss')
plt.xlabel("epoch")
plt.ylabel("loss")
plt.legend()
plt.show()

x = np.linspace(0, len(score_log), len(score_log))
score_log = np.array(score_log)
plt.plot(x, score_log[:, 0], label="RMSE")
plt.plot(x, score_log[:, 1], label="MAE")
plt.plot(x, score_log[:, 2], label="MAPE")
plt.xlabel("epoch")
plt.legend()
plt.show()

x = np.linspace(0, len(end_log), len(end_log))
plt.plot(x, end_log, label="time")
plt.xlabel("epoch")
plt.ylabel("time")
plt.legend()
plt.show()

MAPE

平均绝对百分比误差 mean absolute percent error

对应Python语句:

def MAPE(y, y_pred):
	y, y_pred = np.array(y), np.array(y_pred)
	non_zero_index = (y > 0)
	y = y[non_zero_index]
	y_pred = y_pred[non_zero_index]
	
	mape = np.abs((y-y_pred)/y)
	mape[np.isinf(mape)] = 0
	return np.mean(mape) * 100

MSE

MSE的相关介绍(摘自《深度学习入门 基于Python的理论与实现》[日] 斋藤康毅):

可以用作损失函数的函数有很多,其中最有名的是均方误差(mean squared error)。均方误差如下式所示。
E = 1 2 ∑ k ( y k − t k ) 2 (4.1) E = \frac12\sum_k (y_k - t_k)^2 \tag{4.1} E=21k(yktk)2(4.1)
这里, y k y_k yk 表示神经网络的输出, t k t_k tk 表示监督数据, k k k 表示数据的维数。

对应的Python实现为:

def mean_squared_error(y, t):
	return 0.5 * np.sum((y-t)**2)

对应Python语句:

def MSE(target, output):
	return np.mean(np.power(target-output, 2))

MAE

对应Python语句:

def MAE(target, output):
	return np.mean(np.abs(target-output))

代码及数据集下载地址:

链接:https://pan.baidu.com/s/15Ozg_uNrtPX1CcE6RP1V7g
提取码:mr3n
复制这段内容后打开百度网盘手机App,操作更方便哦

本文地址:https://blog.csdn.net/MarcoLee1997/article/details/113998422

相关标签: Python