欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

预测股票的变化--卡尔曼滤波

程序员文章站 2022-07-12 10:07:14
...
import tensorflow as tf
import numpy as np
​
class KalmanFilter(object):
    def __init__(self, x=None, A=None, P=None, B=None, H=None, Q=None):
        m = self._m = H.shape[0]
        n = self._n = x.shape[0]
        l = self._l = B.shape[1]
        self._x = tf.Variable(x, dtype=tf.float32, name="x")
        self._A = tf.constant(A, dtype=tf.float32, name="A")
        self._P = tf.Variable(P, dtype=tf.float32, name="P")
        self._B = tf.constant(B, dtype=tf.float32, name="B")
        self._Q = tf.constant(Q, dtype=tf.float32, name="Q")
        self._H = tf.constant(H, dtype=tf.float32, name="H")
        self._u = tf.placeholder(dtype=tf.float32, shape=[l, 1], name="u")
        self._z = tf.placeholder(dtype=tf.float32, shape=[m, 1], name="z")
        self._R = tf.placeholder(dtype=tf.float32, shape=[m, m], name="R")
​
    def predict(self):
        x = self._x
        A = self._A
        P = self._P
        B = self._B
        Q = self._Q
        u = self._u
        x_pred = x.assign(tf.matmul(A, x) + tf.matmul(B, u))
        p_pred = P.assign(tf.matmul(A, tf.matmul(P, A, transpose_b=True)) + Q)
        return x_pred, p_pred
       
    
    def correct(self):
        x = self._x
        P = self._P
        H = self._H
        z = self._z
        R = self._R
        K = tf.matmul(P, tf.matmul(tf.transpose(H), tf.matrix_inverse(tf.matmul(H, tf.matmul(P, H, transpose_b=True)) + R)))
        x_corr = x.assign(x + tf.matmul(K, z - tf.matmul(H, x)))
        P_corr = P.assign(tf.matmul((1 - tf.matmul(K, H)), P))
        return K, x_corr, P_corr


import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import xlrd
from plugin import kalman

rnd = np.random.RandomState(0)

'''
数据读取
'''
workbook = xlrd.open_workbook("data.xlsx")
sheet = workbook.sheet_by_name("Sheet1")

n_timesteps = 200
observations = []
x_axis = []
for i in range(1,n_timesteps+1):
    observations.append(float(sheet.cell(i,2).value))
    x_axis.append(float(i))
print(observations)
x_axis = np.array(x_axis,dtype = np.float)
observations = np.array(observations)

n = 1
m = 1
l = 1
x = np.ones([1, 1])
A = np.ones([1, 1])
B = np.zeros([1, 1])
P = np.ones([1, 1])
Q = np.array([[0.005]])
H = np.ones([1, 1])
u = np.zeros([1, 1])
R = np.array([[0.05]])
predictions = []
with tf.Session() as sess:
    kf = kalman.KalmanFilter(x=x, A=A, B=B, P=P, Q=Q, H=H)
    predict = kf.predict()
    correct = kf.correct()
    tf.global_variables_initializer().run()
    for i in range(0, n_timesteps):
        x_pred, _ = sess.run(predict, feed_dict={kf._u: u})
        predictions.append(x_pred[0, 0])
        sess.run(correct, feed_dict={kf._z:np.array([[observations[i]]]), kf._R:R})

# 支持中文t
plt.rcParams['font.sans-serif'] = ['SimHei']  # 用来正常显示中文标签
plt.rcParams['axes.unicode_minus'] = False  # 用来正常显示负号
plt.figure(figsize=(16, 6))
obs_scatter = plt.scatter(x_axis, observations, marker='x', color='b',
                         label='实际值')
position_line = plt.plot(x_axis, np.array(predictions),
                        linestyle='-', marker='o', color='r',
                        label='预测值')

plt.legend(loc='lower right')
plt.xlim(xmin=0, xmax=x_axis.max())
plt.xlabel('time')
plt.show()