目录

  • 数据集
  • 任务介绍
  • 代码详解:
    • 导入需要的模块:
    • 读取并处理数据:
    • 定义RNN类,继承自nn.Module
    • 定义MyRNN类,继承自nn.Module
    • 定义MyLSTM类,继承自nn.Module
    • 定义LSTM类,继承自nn.Module
    • 定义GRU类,继承自nn.Module
    • 定义损失函数 MAPE MSE MAE
    • 定义函数 next_batch
    • 声明与初始化变量,实例化对象
    • 训练与验证模型
    • 结果可视化
  • MAPE
    • 对应Python语句:
  • MSE
    • 对应Python语句:
  • MAE
    • 对应Python语句:
  • 代码及数据集下载地址:

循环神经网络的英文:Recurrent Neural Network,简称为RNN

数据集

  • 高速公路车流量数据。
  • PeMS是美国加利福尼亚州高速公路的实时车流量数据 (每5分钟采集一次)。
  • 数据由铺设在道路上的检测线圈采集。
  • 本实验中包含一个地区的数据,储存在PEMS04.npz文件中。
  • 原始数据使用numpy二进制文件存储,可以使用numpy.load函数读取。
  • 数据有三个特征维度:车流量、拥挤程度和车速。

文件名称:PEMS04
文件后缀:npz
文件大小:31.4 MB

任务介绍

用历史流量数据预测未来流量

  • 手动实现循环神经网络RNN,并在数据集上进行实验,从训练时间、预测精度、Loss变化等角度分析实验结果(最好使用图表展示)
  • 使用torch.nn.LSTM实现循环神经网络,并在数据集上进行实验,从训练时间、预测精度、Loss变化等角度分析实验结果(最好使用图表展示)
  • 不同超参数的对比分析(包括hidden_size、batchsize、lr等)选其中至少1-2个进行分析

代码详解:

导入需要的模块:

import os
import numpy as np
import pandas as pd
import torch
from torch import nn
from sklearn.utils import shuffle
import math
import time 
import matplotlib.pyplot as plt

读取并处理数据:

pems04_data = np.load("./data/PEMS04/PEMS04.npz")['data']

feature_dim = 0
window_size = 12

train_set = []
test_set = []
train_seqs = pems04_data[:int(pems04_data.shape[0] * 0.6)]
test_seqs = pems04_data[int(pems04_data.shape[0] * 0.6):]

for i in range(train_seqs.shape[0] - window_size):
	train_set.append(train_seqs[i:i + window_size, :, feature_dim])
	
for j in range(test_seqs.shape[0] - window_size):
	test_set.append(test_seqs[j:j + window_size, :, feature_dim])

train_set = np.concatenate(train_set, axis=1).transpose()
test_set = np.concatenate(test_set, axis=1).transpose()
input_size = 1

定义RNN类,继承自nn.Module

class RNN(nn.Module):
	def __init__(self, input_size, hidden_size, output_size):
		super(RNN, self).__init__()
		self.hidden_size = hidden_size
		self.rnn = nn.RNN(input_size, hidden_size, output_size, batch_first=True)
		self.linear = nn.Linear(hidden_size, output_size)
	
	def forward(self, x):
		batch_size = x.size(0)
		seq_len = s.size(1)
		h = torch.zeros(input_size, batch_size, self.hidden_size).to(x.device)
		out, hn = self.rnn(x, h)
		out = self.linear(out)
		return hn, out

定义MyRNN类,继承自nn.Module

class MyRNN(nn.Module):
	def __init__(self, input_size, hidden_size, output_size):
		super().__init__()
		self.hidden_size = hidden_size
	
		self.w_h = nn.Parameter(torch.rand(input_size, hidden_size))
		self.u_h = nn.Parameter(torch.rand(hidden_size, hidden_size))
		self.b_h = nn.Parameter(torch.zeros(hidden_size))

		self.w_y = nn.Parameter(torch.rand(hidden_size, output_size))
		self.b_y = nn.Parameter(torch.zeros(output_size))

		self.tanh = nn.Tanh()
		self.leaky_relu = nn.LeakyReLU()

		for param in self.parameters():
			if param.dim() > 1:
				nn.init.xavier_uniform_(param)

	def forward(self, x):
		batch_size = x.size(0)
		seq_len = x.size(1)

		h = torch.zeros(batch_size, self.hidden_size).to(x.device)		
		
		y_list = []
		for i in range(seq_len):
			h = self.tanh(torch.matmul(x[:, i, :], self.w_h) +
						  torch.matmul(h, self.u_h) + self.b_h)
			y = self.leaky_relu(torch.matmul(h, self.w_y) + self.b_y)
			y_list.append(y)
		
		return h, torch.stack(y_list, dim=1)

定义MyLSTM类,继承自nn.Module

class MyLSTM(nn.Module):
	def __init__(self, input_size, hidden_size):
		super().__init__()
        self.hidden_size = hidden_size

        self.gates = nn.Linear(input_size+hidden_size, hidden_size*4)
        self.sigmoid = nn.Sigmoid()
        self.tanh = nn.Tanh()

        for param in self.parameters():
            if param.dim()>1:
                nn.init.xavier_uniform_(param)

	def forward(self, x):
		batch_size = x.size(0)
        seq_len = x.size(1)

        h, c = (torch.zeros(batch_size, self.hidden_size).to(x.device) for _ in range(2))
        y_list = []

        for i in range(seq_len):
            forget_gate, input_gate, output_gate, candidata_cell = self.gates(torch.cat([x[:, i, :], h], dim=1)).chunk(4, -1)
            forget_gate, input_gate, output_gate = (self.sigmoid(g) for g in (forget_gate, input_gate, output_gate))
            c = forget_gate * c + input_gate * self.tanh(candidata_cell)
            h = output_gate * self.tanh(c)
            y_list.append(h)
        return torch.stack(y_list, dim=1), (h, c)

定义LSTM类,继承自nn.Module

class LSTM(nn.Module):
	def __init__(self, input_size, hidden_size, output_size):
		super().__init__()
		self.lstm = nn.LSTM(input_size, hidden_size, 2, batch_first=True)
		self.out = nn.Linear(hidden_size, output_size)

	def forward(self, x):
		x1, hn = self.lstm(x)
		a, b, c = x1.shape
		out = self.out(x1.contiguous().view(-1, c))
		out1 = out.view(a, b, -1)
		return out1

定义GRU类,继承自nn.Module

class GRU(nn.Module):
	def __init__(self, input_size, hidden_size, output_size):
		super().__init__()
		self.gru = nn.GRU(input_size, hidden_size, num_layers=2, batch_first=True)
		self.out = nn.Linear(hidden_size, output_size)
	
	def forward(self, x):
		x1, hn = self.gru(x)
		a, b, c = x1.shape
		out = self.out(x1.contiguous().view(-1, c))
		out1 = out.view(a, b, -1)
		return out1

定义损失函数 MAPE MSE MAE

def MAPE(y, y_pred):
	y, y_pred = np.array(y), np.array(y_pred)
	non_zero_index = (y > 0)
	y = y[non_zero_index]
	y_pred = y_pred[non_zero_index]

	mape = np.abs((y-y_pred)/y)
	mape[np.isinf(mape)] = 0
	return np.mean(mape) * 100


def MSE(target, output):
	return np.mean(np.power(target-output ,2))

def MAE(target, output):
	return np.mean(np.abs(target-output))

定义函数 next_batch

def next_batch(data):
	data_length = len(data)
	num_batches = math.ceil(data_length / batch_size)
	for batch_index in range(num_batches):
		start_index = batch_index * batch_size
		end_index = min((batch_index+1) * batch_size, data_length)
		yield data[start_index:end_index]

声明与初始化变量,实例化对象

epochs = 10
batch_size = 512
train_loss_log = []
test_loss_log = []
score_log = []
end_log = []

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# model = MyRNN(input_size=1, hidden_size=32, output_size=1).to(device)
# model = RNN(1, 32, 1).to(device)
# model = LSTM(input_size=1, hidden_size=32, output_size=1).to(device)
model = GRU(input_size=1, hidden_size=32, output_size=1).to(device)
loss_func = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0005)

训练与验证模型

for epoch in range(epochs):
    start = time.time()
    train_total_loss = 0
    train_batch_num = len(train_set)
    for batch in next_batch(shuffle(train_set)):
        batch = torch.from_numpy(batch).float().to(device)
        x, label = batch[:, :12], batch[:, -1]

        out = model(batch.unsqueeze(-1)) ###lstm
        # hidden, out = model(batch.unsqueeze(-1))
        prediction = out[:, -1, :].squeeze(-1)

        loss = loss_func(prediction, label)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        train_total_loss += loss.item()
    train_loss = train_total_loss / train_batch_num
    train_loss_log.append(train_loss)

    all_prediction = []
    test_total_loss = 0
    test_batch_num = len(test_set)
    for batch in next_batch(test_set):
        batch = torch.from_numpy(batch).float().to(device)
        x, label = batch[:, :12], batch[:, -1]
        out = model(batch.unsqueeze(-1)) ##lstm
        # hidden, out = model(batch.unsqueeze(-1))
        prediction = out[:, -1, :].squeeze(-1)
        loss = loss_func(prediction, label)
        test_total_loss += loss.item()
        all_prediction.append(prediction.detach().cpu().numpy())
    test_loss = test_total_loss/test_batch_num
    test_loss_log.append(test_loss)
    all_prediction = np.concatenate(all_prediction)
    all_label = test_set[:, -1]

    # 计算测试指标。
    rmse_score = math.sqrt(MSE(all_label, all_prediction))
    mae_score = MAE(all_label, all_prediction)
    mape_score = MAPE(all_label, all_prediction)
    score_log.append([rmse_score, mae_score, mape_score])
    end = time.time() - start
    end_log.append(end)
    print('epoch: %d, train_loss: %.4f, RMSE: %.4f, MAE: %.4f, MAPE: %.4f, time: %.1f, tiem/per epoch: %.4f' %
          (epoch + 1, train_loss, rmse_score, mae_score, mape_score, end, np.mean(end_log)))

结果可视化

x = np.linspace(0, len(train_loss_log), len(train_loss_log))
plt.plot(x, train_loss_log, label="train_loss")
plt.plot(x, test_loss_log, label='test_loss')
plt.xlabel("epoch")
plt.ylabel("loss")
plt.legend()
plt.show()

x = np.linspace(0, len(score_log), len(score_log))
score_log = np.array(score_log)
plt.plot(x, score_log[:, 0], label="RMSE")
plt.plot(x, score_log[:, 1], label="MAE")
plt.plot(x, score_log[:, 2], label="MAPE")
plt.xlabel("epoch")
plt.legend()
plt.show()

x = np.linspace(0, len(end_log), len(end_log))
plt.plot(x, end_log, label="time")
plt.xlabel("epoch")
plt.ylabel("time")
plt.legend()
plt.show()

MAPE

平均绝对百分比误差 mean absolute percent error

对应Python语句:

def MAPE(y, y_pred):
	y, y_pred = np.array(y), np.array(y_pred)
	non_zero_index = (y > 0)
	y = y[non_zero_index]
	y_pred = y_pred[non_zero_index]
	
	mape = np.abs((y-y_pred)/y)
	mape[np.isinf(mape)] = 0
	return np.mean(mape) * 100

MSE

MSE的相关介绍(摘自《深度学习入门 基于Python的理论与实现》[日] 斋藤康毅):

可以用作损失函数的函数有很多,其中最有名的是均方误差(mean squared error)。均方误差如下式所示。
E = 1 2 ∑ k ( y k − t k ) 2 (4.1) E = \frac12\sum_k (y_k – t_k)^2 \tag{4.1} E=21k(yktk)2(4.1)
这里, y k y_k yk 表示神经网络的输出, t k t_k tk 表示监督数据, k k k 表示数据的维数。

对应的Python实现为:

def mean_squared_error(y, t):
	return 0.5 * np.sum((y-t)**2)

对应Python语句:

def MSE(target, output):
	return np.mean(np.power(target-output, 2))

MAE

对应Python语句:

def MAE(target, output):
	return np.mean(np.abs(target-output))

代码及数据集下载地址:

链接:https://pan.baidu.com/s/15Ozg_uNrtPX1CcE6RP1V7g
提取码:mr3n
复制这段内容后打开百度网盘手机App,操作更方便哦

本文地址:https://blog.csdn.net/MarcoLee1997/article/details/113998422