利用PyTorch实现循环神经网络RNN
目录
循环神经网络的英文:Recurrent Neural Network,简称为RNN
数据集
- 高速公路车流量数据。
- PeMS是美国加利福尼亚州高速公路的实时车流量数据 (每5分钟采集一次)。
- 数据由铺设在道路上的检测线圈采集。
- 本实验中包含一个地区的数据,储存在PEMS04.npz文件中。
- 原始数据使用numpy二进制文件存储,可以使用numpy.load函数读取。
- 数据有三个特征维度:车流量、拥挤程度和车速。
文件名称:PEMS04
文件后缀:npz
文件大小:31.4 MB
任务介绍
用历史流量数据预测未来流量
- 手动实现循环神经网络RNN,并在数据集上进行实验,从训练时间、预测精度、Loss变化等角度分析实验结果(最好使用图表展示)
- 使用torch.nn.LSTM实现循环神经网络,并在数据集上进行实验,从训练时间、预测精度、Loss变化等角度分析实验结果(最好使用图表展示)
- 不同超参数的对比分析(包括hidden_size、batchsize、lr等)选其中至少1-2个进行分析
代码详解:
导入需要的模块:
import os
import numpy as np
import pandas as pd
import torch
from torch import nn
from sklearn.utils import shuffle
import math
import time
import matplotlib.pyplot as plt
读取并处理数据:
pems04_data = np.load("./data/PEMS04/PEMS04.npz")['data']
feature_dim = 0
window_size = 12
train_set = []
test_set = []
train_seqs = pems04_data[:int(pems04_data.shape[0] * 0.6)]
test_seqs = pems04_data[int(pems04_data.shape[0] * 0.6):]
for i in range(train_seqs.shape[0] - window_size):
train_set.append(train_seqs[i:i + window_size, :, feature_dim])
for j in range(test_seqs.shape[0] - window_size):
test_set.append(test_seqs[j:j + window_size, :, feature_dim])
train_set = np.concatenate(train_set, axis=1).transpose()
test_set = np.concatenate(test_set, axis=1).transpose()
input_size = 1
定义RNN类,继承自nn.Module
class RNN(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(RNN, self).__init__()
self.hidden_size = hidden_size
self.rnn = nn.RNN(input_size, hidden_size, output_size, batch_first=True)
self.linear = nn.Linear(hidden_size, output_size)
def forward(self, x):
batch_size = x.size(0)
seq_len = s.size(1)
h = torch.zeros(input_size, batch_size, self.hidden_size).to(x.device)
out, hn = self.rnn(x, h)
out = self.linear(out)
return hn, out
定义MyRNN类,继承自nn.Module
class MyRNN(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super().__init__()
self.hidden_size = hidden_size
self.w_h = nn.Parameter(torch.rand(input_size, hidden_size))
self.u_h = nn.Parameter(torch.rand(hidden_size, hidden_size))
self.b_h = nn.Parameter(torch.zeros(hidden_size))
self.w_y = nn.Parameter(torch.rand(hidden_size, output_size))
self.b_y = nn.Parameter(torch.zeros(output_size))
self.tanh = nn.Tanh()
self.leaky_relu = nn.LeakyReLU()
for param in self.parameters():
if param.dim() > 1:
nn.init.xavier_uniform_(param)
def forward(self, x):
batch_size = x.size(0)
seq_len = x.size(1)
h = torch.zeros(batch_size, self.hidden_size).to(x.device)
y_list = []
for i in range(seq_len):
h = self.tanh(torch.matmul(x[:, i, :], self.w_h) +
torch.matmul(h, self.u_h) + self.b_h)
y = self.leaky_relu(torch.matmul(h, self.w_y) + self.b_y)
y_list.append(y)
return h, torch.stack(y_list, dim=1)
定义MyLSTM类,继承自nn.Module
class MyLSTM(nn.Module):
def __init__(self, input_size, hidden_size):
super().__init__()
self.hidden_size = hidden_size
self.gates = nn.Linear(input_size+hidden_size, hidden_size*4)
self.sigmoid = nn.Sigmoid()
self.tanh = nn.Tanh()
for param in self.parameters():
if param.dim()>1:
nn.init.xavier_uniform_(param)
def forward(self, x):
batch_size = x.size(0)
seq_len = x.size(1)
h, c = (torch.zeros(batch_size, self.hidden_size).to(x.device) for _ in range(2))
y_list = []
for i in range(seq_len):
forget_gate, input_gate, output_gate, candidata_cell = self.gates(torch.cat([x[:, i, :], h], dim=1)).chunk(4, -1)
forget_gate, input_gate, output_gate = (self.sigmoid(g) for g in (forget_gate, input_gate, output_gate))
c = forget_gate * c + input_gate * self.tanh(candidata_cell)
h = output_gate * self.tanh(c)
y_list.append(h)
return torch.stack(y_list, dim=1), (h, c)
定义LSTM类,继承自nn.Module
class LSTM(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super().__init__()
self.lstm = nn.LSTM(input_size, hidden_size, 2, batch_first=True)
self.out = nn.Linear(hidden_size, output_size)
def forward(self, x):
x1, hn = self.lstm(x)
a, b, c = x1.shape
out = self.out(x1.contiguous().view(-1, c))
out1 = out.view(a, b, -1)
return out1
定义GRU类,继承自nn.Module
class GRU(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super().__init__()
self.gru = nn.GRU(input_size, hidden_size, num_layers=2, batch_first=True)
self.out = nn.Linear(hidden_size, output_size)
def forward(self, x):
x1, hn = self.gru(x)
a, b, c = x1.shape
out = self.out(x1.contiguous().view(-1, c))
out1 = out.view(a, b, -1)
return out1
定义损失函数 MAPE MSE MAE
def MAPE(y, y_pred):
y, y_pred = np.array(y), np.array(y_pred)
non_zero_index = (y > 0)
y = y[non_zero_index]
y_pred = y_pred[non_zero_index]
mape = np.abs((y-y_pred)/y)
mape[np.isinf(mape)] = 0
return np.mean(mape) * 100
def MSE(target, output):
return np.mean(np.power(target-output ,2))
def MAE(target, output):
return np.mean(np.abs(target-output))
定义函数 next_batch
def next_batch(data):
data_length = len(data)
num_batches = math.ceil(data_length / batch_size)
for batch_index in range(num_batches):
start_index = batch_index * batch_size
end_index = min((batch_index+1) * batch_size, data_length)
yield data[start_index:end_index]
声明与初始化变量,实例化对象
epochs = 10
batch_size = 512
train_loss_log = []
test_loss_log = []
score_log = []
end_log = []
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# model = MyRNN(input_size=1, hidden_size=32, output_size=1).to(device)
# model = RNN(1, 32, 1).to(device)
# model = LSTM(input_size=1, hidden_size=32, output_size=1).to(device)
model = GRU(input_size=1, hidden_size=32, output_size=1).to(device)
loss_func = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0005)
训练与验证模型
for epoch in range(epochs):
start = time.time()
train_total_loss = 0
train_batch_num = len(train_set)
for batch in next_batch(shuffle(train_set)):
batch = torch.from_numpy(batch).float().to(device)
x, label = batch[:, :12], batch[:, -1]
out = model(batch.unsqueeze(-1)) ###lstm
# hidden, out = model(batch.unsqueeze(-1))
prediction = out[:, -1, :].squeeze(-1)
loss = loss_func(prediction, label)
optimizer.zero_grad()
loss.backward()
optimizer.step()
train_total_loss += loss.item()
train_loss = train_total_loss / train_batch_num
train_loss_log.append(train_loss)
all_prediction = []
test_total_loss = 0
test_batch_num = len(test_set)
for batch in next_batch(test_set):
batch = torch.from_numpy(batch).float().to(device)
x, label = batch[:, :12], batch[:, -1]
out = model(batch.unsqueeze(-1)) ##lstm
# hidden, out = model(batch.unsqueeze(-1))
prediction = out[:, -1, :].squeeze(-1)
loss = loss_func(prediction, label)
test_total_loss += loss.item()
all_prediction.append(prediction.detach().cpu().numpy())
test_loss = test_total_loss/test_batch_num
test_loss_log.append(test_loss)
all_prediction = np.concatenate(all_prediction)
all_label = test_set[:, -1]
# 计算测试指标。
rmse_score = math.sqrt(MSE(all_label, all_prediction))
mae_score = MAE(all_label, all_prediction)
mape_score = MAPE(all_label, all_prediction)
score_log.append([rmse_score, mae_score, mape_score])
end = time.time() - start
end_log.append(end)
print('epoch: %d, train_loss: %.4f, RMSE: %.4f, MAE: %.4f, MAPE: %.4f, time: %.1f, tiem/per epoch: %.4f' %
(epoch + 1, train_loss, rmse_score, mae_score, mape_score, end, np.mean(end_log)))
结果可视化
x = np.linspace(0, len(train_loss_log), len(train_loss_log))
plt.plot(x, train_loss_log, label="train_loss")
plt.plot(x, test_loss_log, label='test_loss')
plt.xlabel("epoch")
plt.ylabel("loss")
plt.legend()
plt.show()
x = np.linspace(0, len(score_log), len(score_log))
score_log = np.array(score_log)
plt.plot(x, score_log[:, 0], label="RMSE")
plt.plot(x, score_log[:, 1], label="MAE")
plt.plot(x, score_log[:, 2], label="MAPE")
plt.xlabel("epoch")
plt.legend()
plt.show()
x = np.linspace(0, len(end_log), len(end_log))
plt.plot(x, end_log, label="time")
plt.xlabel("epoch")
plt.ylabel("time")
plt.legend()
plt.show()
MAPE
平均绝对百分比误差 mean absolute percent error
对应Python语句:
def MAPE(y, y_pred):
y, y_pred = np.array(y), np.array(y_pred)
non_zero_index = (y > 0)
y = y[non_zero_index]
y_pred = y_pred[non_zero_index]
mape = np.abs((y-y_pred)/y)
mape[np.isinf(mape)] = 0
return np.mean(mape) * 100
MSE
MSE的相关介绍(摘自《深度学习入门 基于Python的理论与实现》[日] 斋藤康毅):
可以用作损失函数的函数有很多,其中最有名的是均方误差(mean squared error)。均方误差如下式所示。
E
=
1
2
∑
k
(
y
k
−
t
k
)
2
(4.1)
E = \frac12\sum_k (y_k - t_k)^2 \tag{4.1}
E=21k∑(yk−tk)2(4.1)
这里,
y
k
y_k
yk 表示神经网络的输出,
t
k
t_k
tk 表示监督数据,
k
k
k 表示数据的维数。
对应的Python实现为:
def mean_squared_error(y, t):
return 0.5 * np.sum((y-t)**2)
对应Python语句:
def MSE(target, output):
return np.mean(np.power(target-output, 2))
MAE
对应Python语句:
def MAE(target, output):
return np.mean(np.abs(target-output))
代码及数据集下载地址:
链接:https://pan.baidu.com/s/15Ozg_uNrtPX1CcE6RP1V7g
提取码:mr3n
复制这段内容后打开百度网盘手机App,操作更方便哦
本文地址:https://blog.csdn.net/MarcoLee1997/article/details/113998422
上一篇: 使用Redis实现微信步数排行榜功能
下一篇: 如何监控 Linux 服务器状态的方法