APEnt_test
程序员文章站
2024-02-11 09:34:34
...
近似熵主要的是从衡量时间序列复杂性的角度来度量信号中产生新模式的概率大小,产生新模式的概率越大,序列的复杂性越大,相应的近似熵也越大。本篇设计了两个需要预测的时间序列,test_data1为信噪比较高的信号,test_data2为噪声较大的信号,通过计算发现test_data1的近似熵较为真实的反应了新模式的产生,而test_data2的近似熵则无法反应新模式的产生。对test_data2通过小波滤波后再进行近似熵的计算,则能较好地反应新模式的产生
import matplotlib.pyplot as plt
#from pandas import Series
#from pandas import DataFrame
#import pandas as pd
import numpy as np
##from pandas import *
#import pandas
#import xlrd
# SampEn calculation
from numpy import arange
import math
#import numpy as np
#from scipy import interpolate
#from scipy.interpolate import spline
#近似熵
def ApEn(U, m, r):
def _maxdist(x_i, x_j):
return max([abs(ua - va) for ua, va in zip(x_i, x_j)])
def _phi(m):
x = [[U[j] for j in range(i, i + m - 1 + 1)] for i in range(N - m + 1)]
C = [len([1 for x_j in x if _maxdist(x_i, x_j) <= r]) / (N - m + 1.0) for x_i in x]
return (N - m + 1.0)**(-1) * sum(np.log(C))
N = len(U)
return abs(_phi(m+1) - _phi(m))
#num_pi为要产生几个π的sin数据,num_ex为异常点的个数,num_gap为段缺失数据的个数,num_bk为单个缺失值的个数
def test_data_gen(num_pi,num_ex,num_gap,num_bk):
if (num_pi>0) :
num_point=72*num_pi
x=np.linspace(0,np.pi*num_pi,num_point)
signal1=[(2*math.sin(i)+1) for i in x] #产生测试用的num_pi个sin数据
noise=np.random.normal(0,0.05,num_point)#numpy.random.normal(噪声均值, 噪声标准差, 噪声的shape)
signal1=signal1+noise#在sin数据上添加噪声
else:
print("Please input valid num_pi")
return
if (num_ex>0) :
#随机添加异常值
point_ex=[]
for i in range(num_ex):
point_ex.append(np.random.randint(0,len(signal1))) #异常值的位置
for _ in point_ex:
signal1[_]=signal1[_]*1.8
else:
pass
if (num_gap>0) :
#随机添加段数据缺失
longth_gap=np.random.randint(15)+5 #缺口大小5~20
point_gap=[] #缺口的位置
for i in range(num_gap):
point_gap.append(np.random.randint(num_point-20))
for i in point_gap:
for j in range(longth_gap):
signal1[i+j]=None
else:
pass
if (num_bk>0) :
#随机添加单点缺失值
point_break=[]
for i in range(num_bk):
point_break.append(np.random.randint(num_point))
for _ in point_break:
signal1[_]=None
else:
pass
#产生时间序列,每隔5分钟一个点
"""
date_need=[]
start_dt = datetime.datetime(2017, 1, 1)
interval = datetime.timedelta(seconds=300)
for i in range(num_point):
date_need.append(start_dt + interval * i)
df = DataFrame(signal1,index = date_need[0:num_point])
df.to_excel('data_test.xlsx')
"""
plt.figure(figsize=(10,5))
plt.plot(signal1)
plt.show()
return signal1
test_data=test_data_gen(240,0,0,0)
num_point=72*240
x=np.linspace(0,np.pi*3,num_point)
signal_modul=[(math.sin(i)+0.4) for i in x] #调制用的信号
for i in range(72*240):
if (i<1000 or i>3000):
signal_modul[i]=1
signal_modul[9000:12000]=signal_modul[1000:3000]
signal_modul[1000:3001]=[1]*3000
signal_modul.append(1)
plt.plot(signal_modul)
[<matplotlib.lines.Line2D at 0x13c753aad68>]
data=signal_modul*test_data
plt.plot(data)
[<matplotlib.lines.Line2D at 0x13c753f2358>]
z_list=[]
x_list=[]
j = (len(data)-288*7)//288+1
for i in range(j):
y = data[i*288:i*288+288*7]
y_std = np.std(y, ddof=1)#ddof=1为样本标准偏差,而非整体标准偏差
z = ApEn(y, 2, 0.1*y_std)#将数据切片为7天一组,计算一组的近似熵
# z = entropy(y)
#print(i)
#print(z)
# print(y)
z_list.append(z)
x_list.append(i*288+288*7)
x1 = arange(0, len(data), 1)
#x2 = cols0[1400:1400+j]
#fig=plt.figure(figsize=(20,10))
#fig, axes = plt.subplots(2, 1, sharex=True,figsize=(25,10))
#ax1=fig.add_subplot(2,1,1)
#ax2=fig.add_subplot(2,1,2)
fig, axes = plt.subplots(2, 1, sharex=True,figsize=(25,10))
axes[0].scatter(x_list,z_list)
axes[1].plot(x1,data)
[<matplotlib.lines.Line2D at 0x13c76483668>]
一开始没有设置好test_data中噪声的幅度,导致近似熵无法很好的反应新模式的产生
#num_pi为要产生几个π的sin数据,num_ex为异常点的个数,num_gap为段缺失数据的个数,num_bk为单个缺失值的个数
def test_data_gen2(num_pi,num_ex):
if (num_pi>0) :
num_point=72*num_pi
x=np.linspace(0,np.pi*num_pi,num_point)
signal1=[(math.sin(i)+1) for i in x] #产生测试用的num_pi个sin数据
noise=np.random.normal(0,0.1,num_point)#numpy.random.normal(噪声均值, 噪声标准差, 噪声的shape)
signal1=signal1+noise#在sin数据上添加噪声
else:
print("Please input valid num_pi")
return
if (num_ex>0) :
#随机添加异常值
point_ex=[]
for i in range(num_ex):
point_ex.append(np.random.randint(0,len(signal1))) #异常值的位置
for _ in point_ex:
signal1[_]=signal1[_]*1.8
else:
pass
plt.figure(figsize=(10,5))
plt.plot(signal1)
plt.show()
return signal1
test_data2=test_data_gen2(240,0)
data2=signal_modul*test_data2
z2_list=[]
x2_list=[]
j = (len(data2)-288*7)//288+1
for i in range(j):
y = data2[i*288:i*288+288*7]
y_std = np.std(y, ddof=1)#ddof=1为样本标准偏差,而非整体标准偏差
z = ApEn(y, 2, 0.1*y_std)#将数据切片为7天一组,计算一组的近似熵
# z = entropy(y)
#print(i)
#print(z)
# print(y)
z2_list.append(z)
x2_list.append(i*288+288*7)
x2 = arange(0, len(data2), 1)
fig, axes = plt.subplots(2, 1, sharex=True,figsize=(25,10))
axes[0].scatter(x2_list,z2_list)
axes[1].plot(x2,data2)
[<matplotlib.lines.Line2D at 0x13c76a186d8>]
import pywt
# 小波滤噪
def wavelet_denoising(data):
# 小波函数取db4
db4 = pywt.Wavelet('db4')
#if type(data) is not types.NoneType:
# 分解
coeffs = pywt.wavedec(data, db4)
# 高频系数置零
coeffs[len(coeffs)-1] *= 0
coeffs[len(coeffs)-2] *= 0
coeffs[-3]*=0
coeffs[-4]*=0
# 重构
meta = pywt.waverec(coeffs, db4)
return meta
data3=wavelet_denoising(data2)
z3_list=[]
x3_list=[]
k = (len(data3)-288*7)//288+1
for i in range(k):
y = data3[i*288:i*288+288*7]
y_std = np.std(y, ddof=1)#ddof=1为样本标准偏差,而非整体标准偏差
z = ApEn(y, 2, 0.1*y_std)#将数据切片为7天一组,计算一组的近似熵
# z = entropy(y)
#print(i)
#print(z)
# print(y)
z3_list.append(z)
x3_list.append(i*288+288*7)
x3 = arange(0, len(data3), 1)
fig, axes = plt.subplots(2, 1, sharex=True,figsize=(25,10))
axes[0].scatter(x3_list,z3_list)
axes[1].plot(x3,data3)
[<matplotlib.lines.Line2D at 0x13c005d5828>]
推荐阅读