LSTM预测物联网时间序列数据
在上一期我们开发了一个简单的LSTM神经网络来预测时序数据的值。在本期我们要把这模型用在真实世界的物联网数据上。作为示例,我们会根据之前几天观测到的数据预测太阳能电池板的日产电量。
太阳能发电量预测是一个重要且艰难的问题。太阳能产电量的预测还与天气预测密切相关。实际上,这个问题分为两部分,第一部分,我们需要关注太阳能光强度或者其他气象的变量,另一方面我们需要计算在预测的天气状况下太阳能电池板的产电量。通常来说,这种复杂问题的处理经常会涉及到空间和时间尺度的变化,本教程仅仅简单的通过之前太阳能电池板生成的电量数据预测之后的数据。
目标
通过一块太阳能电池板以往一天产生的电量,我们需要预测全部太阳能电池板在预测天内产生的电量。我们需要使用在上期开发的基于时间序列预测的LSTM网络模型来根据以往的数据预测太阳能电池板的产电量。
我们使用太阳能电池板的历史数据据来训练模型,在我们的示例中,我们需要使用之前的数据预测整个太阳能电池板阵列的产电量。我们在最初的两个数据之后就开始预测,然后与新读取的数据进行拟合。
在本教程中我们我们会使用上期的LSTM模型,因此与上期类似,有如下几个部分:
- 初始化
- 数据生成
- LSTM模型构建
- 模型训练和评估
初始化
我们先导入一些库,定义一些以后会用到的常量。
from matplotlib import pyplot as plt
import math
import numpy as np
import os
import pandas as pd
import random
import time
import cntk as C
try:
from urllib.request import urlretrieve
except ImportError:
from urllib import urlretrieve
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
在下面的代码中,我们通过检查在CNTK内部定义的环境变量来选择正确的设备(GPU或者CPU)来运行代码,如果不检查的话,会使用CNTK的默认策略来使用最好的设备(如果GPU可用的话就使用GPU,否则使用CPU)
if 'TEST_DEVICE' in os.environ:
if os.environ['TEST_DEVICE'] == 'cpu':
C.device.try_set_default_device(C.device.cpu())
else:
C.device.try_set_default_device(C.device.gpu(0))
- 1
- 2
- 3
- 4
- 5
我们设定了两种运行模式:
- 快速模式:isFast变量设置成True。这是我们的默认模式,在这个模式下我们会训练更少的次数,也会使用更少的数据,这个模式保证功能的正确性,但训练的结果还远远达不到可用的要求。
- 慢速模式:我们建议学习者在学习的时候试试将isFast变量设置成False,这会让学习者更加了解本教程的内容。
快速模式我们会训练100个周期,得到的结果会有比较低的精度,不过对于开发来说已经足够了。这个模型目前比较好的表现需要训练1000到2000个周期。
isFast = True
# we need around 2000 epochs to see good accuracy. For testing 100 epochs will do.
EPOCHS = 100 if isFast else 2000
- 1
- 2
- 3
- 4
我们的太阳能电池板每三十分钟采集一次数据:
- solar.current表示当前的产电量,用瓦特表示。
- solar.total是到当天的目前为止,太阳能电池板的平均功率,用瓦特/小时表示。
我们的预测采用的数据从最初读取的两个数据开始。以这些数据为基础,我们不断的预测和把预测结果与新的真实值拟合。我们用到的数据采用csv格式,其形式如下:
time,solar.current,solar.total
7am,6.3,1.7
7:30am,44.3,11.4
…
我们使用的训练数据集包含了三年的数据,可以从这里下载:https://guschmueds.blob.core.windows.net/datasets/solar.csv。数据没有预处理,有些行的数据有些确实和错误。
预处理
在本示例中大部分代码都是用来数据预处理的。感谢pandas库让数据预处理工作变得非常简单。
generate_solar_data函数执行了如下任务:
- 以pandas数据帧的方式读取CSV数据
- 格式化数据
- 数据分组
- 添加solar.current.max列和solar.total.max列。
- 生成每天的数据序列
序列生成:所有的序列都会被构成一系列的序列,在这里再也没有时间戳的概念,只有序列才是要紧的。
注:如果一天的数据量少于8个,我们会假设当天的数据丢失了,就跳过这天的数据。如果一天的数据多于14条,我们会截取为14条。
训练/测试/验证
我们从使用CNTK读取CSV文件开始,数据以一行行组织,通过时间排列,我们需要随机将这些数据分割成训练数据集、验证数据集和测试数据集,但是这样的分割会让数据与真实情况不符。因此,我们我们使用如下方式分割数据:把一天的数据读取进序列,其中八个数据作为训练数据,一个是验证数据,一个是测试数据,这就可以让训练数据、验证数据和测试数据分布在整个时间线上。
def generate_solar_data(input_url, time_steps, normalize=1, val_size=0.1, test_size=0.1):
"""
generate sequences to feed to rnn based on data frame with solar panel data
the csv has the format: time ,solar.current, solar.total
(solar.current is the current output in Watt, solar.total is the total production
for the day so far in Watt hours)
"""
# try to find the data file local. If it doesn't exists download it.
cache_path = os.path.join("data", "iot")
cache_file = os.path.join(cache_path, "solar.csv")
if not os.path.exists(cache_path):
os.makedirs(cache_path)
if not os.path.exists(cache_file):
urlretrieve(input_url, cache_file)
print("downloaded data successfully from ", input_url)
else:
print("using cache for ", input_url)
df = pd.read_csv(cache_file, index_col="time", parse_dates=['time'], dtype=np.float32)
df["date"] = df.index.date
# normalize data
df['solar.current'] /= normalize
df['solar.total'] /= normalize
# group by day, find the max for a day and add a new column .max
grouped = df.groupby(df.index.date).max()
grouped.columns = ["solar.current.max", "solar.total.max", "date"]
# merge continuous readings and daily max values into a single frame
df_merged = pd.merge(df, grouped, right_index=True, on="date")
df_merged = df_merged[["solar.current", "solar.total",
"solar.current.max", "solar.total.max"]]
# we group by day so we can process a day at a time.
grouped = df_merged.groupby(df_merged.index.date)
per_day = []
for _, group in grouped:
per_day.append(group)
# split the dataset into train, validatation and test sets on day boundaries
val_size = int(len(per_day) * val_size)
test_size = int(len(per_day) * test_size)
next_val = 0
next_test = 0
result_x = {"train": [], "val": [], "test": []}
result_y = {"train": [], "val": [], "test": []}
# generate sequences a day at a time
for i, day in enumerate(per_day):
# if we have less than 8 datapoints for a day we skip over the
# day assuming something is missing in the raw data
total = day["solar.total"].values
if len(total) < 8:
continue
if i >= next_val:
current_set = "val"
next_val = i + int(len(per_day) / val_size)
elif i >= next_test:
current_set = "test"
next_test = i + int(len(per_day) / test_size)
else:
current_set = "train"
max_total_for_day = np.array(day["solar.total.max"].values[0])
for j in range(2, len(total)):
result_x[current_set].append(total[0:j])
result_y[current_set].append([max_total_for_day])
if j >= time_steps:
break
# make result_y a numpy array
for ds in ["train", "val", "test"]:
result_y[ds] = np.array(result_y[ds])
return result_x, result_y
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
数据缓存
对于常规测试,我们希望使用本地缓存的数据。如果缓存不能用,我们就需要去下载。
# there are 14 lstm cells, 1 for each possible reading we get per day
TIMESTEPS = 14
# 20000 is the maximum total output in our dataset. We normalize all values with
# this so our inputs are between 0.0 and 1.0 range.
NORMALIZE = 20000
X, Y = generate_solar_data("https://www.cntk.ai/jup/dat/solar.csv",
TIMESTEPS, normalize=NORMALIZE)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
获取实时数据
next_batch产生下一次用于训练的数据集。我们在CNTK中使用的变长序列和取样包都是一系列的numpy数组,这些数组都是变长的。
标准的做法是每个训练周期都随机抽取数据包,但是我们不这么干,为了以后数据可视化的时候易于理解。
# process batches of 10 days
BATCH_SIZE = TIMESTEPS * 10
def next_batch(x, y, ds):
"""get the next batch for training"""
def as_batch(data, start, count):
return data[start:start + count]
for i in range(0, len(x[ds]), BATCH_SIZE):
yield as_batch(X[ds], i, BATCH_SIZE), as_batch(Y[ds], i, BATCH_SIZE)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
理解数据格式
现在你可以看到我们输进LSTM神经网络的数据了。
X['train'][0:3]
- 1
输出:
[array([ 0. , 0.0006985], dtype=float32),
array([ 0. , 0.0006985, 0.0033175], dtype=float32),
array([ 0. , 0.0006985, 0.0033175, 0.010375 ], dtype=float32)]
- 1
- 2
- 3
Y['train'][0:3]
- 1
输出
array([[ 0.23899999],
[ 0.23899999],
[ 0.23899999]], dtype=float32)
- 1
- 2
- 3
LSTM神经网络初始化
与最多14个输入数据相对应,我们的模型有14个LSTM单元,每个单元代表当天的每个输入数据点。由于输入的数据在8到14之间变动,我们就可以利用CNTK可变长序列的优势,而不用专门去填充空白的输入单元。
神经网络的输出值是某天的输出电量值,给定的每个序列具有一样的总输出电量。打个比方:
1.7,11.4 -> 10300
1.7,11.4,67.5 -> 10300
1.7,11.4,67.5,250.5 ... -> 10300
1.7,11.4,67.5,250.5,573.5 -> 10300
- 1
- 2
- 3
- 4
LSTM的输出值作为全连接网络层的输入值,当然其中会被随机丢掉百分之二十,来避免过度拟合。全连接网络层的输出值就是我们这个模型的预测值了。
我们的LSTM模型设计如下:
下面的代码就是对上图的实现:
def create_model(x):
"""Create the model for time series prediction"""
with C.layers.default_options(initial_state = 0.1):
m = C.layers.Recurrence(C.layers.LSTM(TIMESTEPS))(x)
m = C.sequence.last(m)
m = C.layers.Dropout(0.2)(m)
m = C.layers.Dense(1)(m)
return m
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
训练
在我们训练之前我们需要绑定输入数据,选定训练器。在本示例中我们使用adam训练器,squared_error作为成本函数。
# input sequences
x = C.sequence.input_variable(1)
# create the model
z = create_model(x)
# expected output (label), also the dynamic axes of the model output
# is specified as the model of the label input
l = C.input_variable(1, dynamic_axes=z.dynamic_axes, name="y")
# the learning rate
learning_rate = 0.005
lr_schedule = C.learning_rate_schedule(learning_rate, C.UnitType.minibatch)
# loss function
loss = C.squared_error(z, l)
# use squared error to determine error for now
error = C.squared_error(z, l)
# use adam optimizer
momentum_time_constant = C.momentum_as_time_constant_schedule(BATCH_SIZE / -math.log(0.9))
learner = C.fsadagrad(z.parameters,
lr = lr_schedule,
momentum = momentum_time_constant)
trainer = C.Trainer(z, (loss, error), [learner])
# training
loss_summary = []
start = time.time()
for epoch in range(0, EPOCHS):
for x_batch, l_batch in next_batch(X, Y, "train"):
trainer.train_minibatch({x: x_batch, l: l_batch})
if epoch % (EPOCHS / 10) == 0:
training_loss = trainer.previous_minibatch_loss_average
loss_summary.append(training_loss)
print("epoch: {}, loss: {:.4f}".format(epoch, training_loss))
print("Training took {:.1f} sec".format(time.time() - start))
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
结果:
epoch: 0, loss: 0.0966
epoch: 10, loss: 0.0305
epoch: 20, loss: 0.0208
epoch: 30, loss: 0.0096
epoch: 40, loss: 0.0088
epoch: 50, loss: 0.0072
epoch: 60, loss: 0.0071
epoch: 70, loss: 0.0075
epoch: 80, loss: 0.0082
epoch: 90, loss: 0.0082
Training took 134.4 sec
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
然后我们测试和验证。我们使用均方误差成本函数可能稍微有点简单,有个办法,我们可以计算在误差允许范围内的比率。
# validate
def get_mse(X,Y,labeltxt):
result = 0.0
for x1, y1 in next_batch(X, Y, labeltxt):
eval_error = trainer.test_minibatch({x : x1, l : y1})
result += eval_error
return result/len(X[labeltxt])
# Print the train and validation errors
for labeltxt in ["train", "val"]:
print("mse for {}: {:.6f}".format(labeltxt, get_mse(X, Y, labeltxt)))
# Print the test error
labeltxt = "test"
print("mse for {}: {:.6f}".format(labeltxt, get_mse(X, Y, labeltxt)))
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
可视化预测结果
我们的模型训练状况量好,因为训练、测试和验证的误差值都在一个可控的范围内。预测的时序数据能够很好的可视化出来,让我们看看实际和预测的对比。
# predict
f, a = plt.subplots(2, 1, figsize=(12, 8))
for j, ds in enumerate(["val", "test"]):
results = []
for x_batch, _ in next_batch(X, Y, ds):
pred = z.eval({x: x_batch})
results.extend(pred[:, 0])
# because we normalized the input data we need to multiply the prediction
# with SCALER to get the real values.
a[j].plot((Y[ds] * NORMALIZE).flatten(), label=ds + ' raw');
a[j].plot(np.array(results) * NORMALIZE, label=ds + ' pred');
a[j].legend();