MXNET深度学习框架-19-从0开始的Bathnorm(批量归一化)
通常来说,数据标准化预处理对于浅层模型就足够有效了。随着模型训练的进行,当每层中参数更新时,靠近输出层的输出较难出现剧烈变化。但对深层神经网络来说,即使输入数据已做标准化,训练中模型参数的更新依然很容易造成靠近输出层输出的剧烈变化。这种计算数值的不稳定性通常令我们难以训练出有效的深度模型。
批量归一化(batch normalization)是一个能让较深的神经网络的训练变得更加容易的方法。在模型训练时,批量归一化利用小批量上的均值和标准差,不断调整神经网络中间输出,从而使整个神经网络在各层的中间输出的数值更稳定。简单来说,batch normalization就是对神经网络某一层的**函数的输入进行归一化,使批量呈标准正态分布(均值为0,标准差为1)
相关公式如下:
给定一个批量B={},我们需要学习拉伸参数和。
所以定义:
(1)
(2)
(3)
(4)
其中,(1)式求出均值;(2)式求出方差;(3)式:(输入-均值)/方差,加上是为了保证分母不小于0,最终稿得到一个均化的;(4)式:和就是在训练过程中进行更新变化,有点类似和。
根据公式我们来实现一个简单的批量归一化层:
# BN要考虑全连接层和卷积层,当为卷积层时,需要对每个通道进行归一化,当为全连接层时,
# 每个批量都要使用
def batch_norm(X,gamma,beta,eps=1e-5):
# len(X.shape) in (2,4) # 就是说X如果是全连接,那么就是2维,卷积是一个4维的东西,所以是4
if len(X.shape)==2:
# 每个输入维度在样本上的均值和方差 (batch_size*feature)
mean=X.mean(axis=0) # 均值
vari=((X-mean)**2).mean(axis=0) # 方差
else:
# 2D卷积(NCHW--batch_size*channel*height*width)
mean=X.mean(axis=(0,2,3),keepdims=True)
vari=((X-mean)**2).mean(axis=(0,2,3),keepdims=True)
# 均一化
X_hat=(X-mean)/nd.sqrt(vari+eps)
#拉升和偏移(维度一致,便于计算)
return gamma.reshape(mean.shape)*X_hat+beta.reshape(mean.shape)
1)运行一个全连接的实例
A=nd.arange(6).reshape((3,2))
print(A.shape)
result=batch_norm(A,gamma=nd.array([1,1]),beta=nd.array([0,0])) # A是一个3X2的矩阵,gamma是一个1X2的矩阵
print(result)
结果:
我们的预期结果是对每一列进行归一化,下面我们来分析一下,看这一列:
我们可以目测发现:均值为0,方差为1,符合结果。
2)运行一个2D卷积的实例
B=nd.arange(18).reshape((1,2,3,3))
print(B)
result_conv=batch_norm(B,gamma=nd.array([1,1]),beta=nd.array([0,0]))
print(result_conv)
运行结果:
同样符合预期结果,均值为0,方差为1。
以上为训练过程所使用的batch norm,那么,测试的时候要不要使用BN?这个问题其实不太好回答:
1、不用的话训练出的模型参数在测试时也许就不准了;
2、用的话,测试时有可能只有一个数据。
事实上,测试时也是需要使用BN的,只不过需要改动一下。在测试时,需要把原先训练的均值和方差替换成整个训练数据的均值和方差,但是当训练数据很大时,计算量就非常大,所以,使用移动平均的方法来近似计算。
这里有个思路:在训练时存储全局的均值和方差,测试时使用就好了。所以更新一下BN代码:
def BN(X,gamma,beta,is_training,moving_mean,moving_vari,moving_momentum,eps=1e-5):
# len(X.shape) in (2,4) # 就是说X如果是全连接,那么就是2维,卷积是一个4维的东西,所以是4
if len(X.shape) == 2:
# 每个输入维度在样本上的均值和方差 (batch_size*feature)
mean = X.mean(axis=0) # 均值
vari = ((X - mean) ** 2).mean(axis=0) # 方差
else:
# 2D卷积(NCHW--batch_size*channel*height*width)
mean = X.mean(axis=(0, 2, 3), keepdims=True)
vari = ((X - mean) ** 2).mean(axis=(0, 2, 3), keepdims=True)
moving_mean=moving_mean.reshape(mean.shape) # 维度需要一致
moving_vari=moving_vari.reshape(vari.shape)
# 均一化
if is_training:
X_hat = (X - mean) / nd.sqrt(vari + eps)
# 更新全局的均值和方差
moving_mean[:]=moving_momentum*moving_mean+(1-moving_momentum)*mean # moving_momentum--平滑因子
moving_vari[:]=moving_momentum*moving_vari+(1-moving_momentum)*vari
else:
# 测试时使用全局均值和方差
X_hat = (X - moving_mean) / nd.sqrt(moving_vari + eps)
#拉升和偏移(维度一致,便于计算)
return gamma.reshape(mean.shape)*X_hat+beta.reshape(mean.shape) #
BN应用到CNN中
这里需要提前说明一点,BN只能让模型收敛加快,对于提升准确率的方法需要查找其它方法。
import mxnet.ndarray as nd
import mxnet.autograd as ag
import mxnet.gluon as gn
import mxnet as mx
'''
# BN要考虑全连接层和卷积层,当为卷积层时,需要对每个通道进行归一化,当为全连接层时,
# 每个批量都要使用
def batch_norm(X,gamma,beta,eps=1e-5):
# len(X.shape) in (2,4) # 就是说X如果是全连接,那么就是2维,卷积是一个4维的东西,所以是4
if len(X.shape)==2:
# 每个输入维度在样本上的均值和方差 (batch_size*feature)
mean=X.mean(axis=0) # 均值
vari=((X-mean)**2).mean(axis=0) # 方差
else:
# 2D卷积(NCHW--batch_size*channel*height*width)
mean=X.mean(axis=(0,2,3),keepdims=True) # CSDN对应的公式(1)
vari=((X-mean)**2).mean(axis=(0,2,3),keepdims=True) # CSDN对应的公式(2)
# 均一化
X_hat=(X-mean)/nd.sqrt(vari+eps) # CSDN对应的公式(3)
#拉升和偏移(维度一致,便于计算)
return gamma.reshape(mean.shape)*X_hat+beta.reshape(mean.shape) # CSDN对应的公式(4)
# 运行一个实例看看(全连接的实例)
A=nd.arange(6).reshape((3,2))
print(A)
result=batch_norm(A,gamma=nd.array([1,1]),beta=nd.array([0,0])) # A是一个3X2的矩阵,gamma是一个1X2的矩阵
print(result)
# 运行一个实例看看(卷积的实例)
B=nd.arange(18).reshape((1,2,3,3))
print(B)
result_conv=batch_norm(B,gamma=nd.array([1,1]),beta=nd.array([0,0]))
print(result_conv)
'''
def BN(X,gamma,beta,is_training,moving_mean,moving_vari,moving_momentum=0.9,eps=1e-5):
# len(X.shape) in (2,4) # 就是说X如果是全连接,那么就是2维,卷积是一个4维的东西,所以是4
if len(X.shape) == 2:
# 每个输入维度在样本上的均值和方差 (batch_size*feature)
mean = X.mean(axis=0) # 均值
vari = ((X - mean) ** 2).mean(axis=0) # 方差
else:
# 2D卷积(NCHW--batch_size*channel*height*width)
mean = X.mean(axis=(0, 2, 3), keepdims=True) # CSDN对应的公式(1)
vari = ((X - mean) ** 2).mean(axis=(0, 2, 3), keepdims=True) # CSDN对应的公式(2)
moving_mean=moving_mean.reshape(mean.shape) # 维度需要一致
moving_vari=moving_vari.reshape(vari.shape)
# 均一化
if is_training:
X_hat = (X - mean) / nd.sqrt(vari + eps) # CSDN对应的公式(3)
# 更新全局的均值和方差
moving_mean[:]=moving_momentum*moving_mean+(1-moving_momentum)*mean # moving_momentum--平滑因子
moving_vari[:]=moving_momentum*moving_vari+(1-moving_momentum)*vari
else:
# 测试时使用全局均值和方差
X_hat = (X - moving_mean) / nd.sqrt(moving_vari + eps) # CSDN对应的公式(3)
#拉升和偏移(维度一致,便于计算)
return gamma.reshape(mean.shape)*X_hat+beta.reshape(mean.shape) # CSDN对应的公式(4)
ctx=mx.gpu()
'''---参数定义---'''
# 1、创建第一个卷积层的w、b ,核大小为5,单通道图片(输入),输出20个特征图
c1,c2=20,50
w1 = nd.random_normal(shape=(c1, 1, 5, 5), scale=0.01,ctx=ctx)
b1 = nd.random_normal(shape=w1.shape[0], scale=0.01, ctx=ctx)
# BN
gamma1=nd.random_normal(shape=(c1),scale=0.01,ctx=ctx) #如果是卷积的话,gamma和beta的数目跟上一层卷积输出特征图数是一样的
beta1=nd.random_normal(shape=(c1),scale=0.01,ctx=ctx)
moving_mean1=nd.zeros(shape=c1,ctx=ctx)
moving_vari1=nd.zeros(shape=c1,ctx=ctx)
# 2、创建第二个卷积层的w、b ,核大小为5,输入为20,输出50个特征图
w2 = nd.random_normal(shape=(c2, c1, 3, 3), scale=0.01, ctx=ctx)
b2 = nd.random_normal(shape=w2.shape[0], scale=0.01, ctx=ctx)
# BN
gamma2=nd.random_normal(shape=(c2),scale=0.01,ctx=ctx)
beta2=nd.random_normal(shape=(c2),scale=0.01,ctx=ctx)
moving_mean2=nd.zeros(shape=c2,ctx=ctx)
moving_vari2=nd.zeros(shape=c2,ctx=ctx)
# 3、全连接层参数------(1250,128)
f3=128
w3 = nd.random_normal(shape=(1250, f3), scale=0.01, ctx=ctx)
b3 = nd.random_normal(shape=f3, scale=0.01, ctx=ctx)
# 4、全连接层参数------(128,10)
w4 = nd.random_normal(shape=(f3, 10), scale=0.01, ctx=ctx)
b4 = nd.random_normal(shape=10, scale=0.01, ctx=ctx)
params=[w1, b1, w2, b2, w3, b3, w4, b4,gamma1,beta1,gamma2,beta2] # moving_mean和moving_vari是不需要更新的
for param in params:
param.attach_grad()
'''---定义模型(注意BN是在卷积之后,**函数之前)---'''
def cnn_net(X,is_training=False):# 这里只对卷积做BN
# 第一层卷积+池化
h1_conv = nd.Convolution(data=X.reshape((-1, 1, 28, 28)), weight=w1, bias=b1, kernel=w1.shape[2:],
num_filter=w1.shape[0])
h1_bn=BN(h1_conv,gamma1,beta1,is_training,moving_mean1,moving_vari1)
h1_activity = nd.relu(data=h1_bn) # relu**
h1_pool = nd.Pooling(data=h1_activity, kernel=(2, 2), pool_type="max", stride=(2, 2)) # 最大池化
# 第二层卷积+池化
h2_conv = nd.Convolution(data=h1_pool, weight=w2, bias=b2, kernel=w2.shape[2:], num_filter=w2.shape[0])
h2_bn = BN(h2_conv, gamma2, beta2, is_training, moving_mean2, moving_vari2)
h2_activity = nd.relu(data=h2_bn) # relu**
h2_pool = nd.Pooling(data=h2_activity, kernel=(2, 2), pool_type="max", stride=(2, 2)) # 最大池化
# 第一层全连接
h2 = nd.Flatten(h2_pool) # 拉成一个batch*一维向量(2D矩阵)
h3_linear = nd.dot(h2, w3) + b3
h3 = nd.relu(h3_linear)
# 第二层全连接
h4 = nd.dot(h3, w4) + b4
return h4
'''---读取数据和预处理---'''
def load_data_fashion_mnist(batch_size, resize=None):
transformer = []
if resize:
transformer += [gn.data.vision.transforms.Resize(resize)]
transformer += [gn.data.vision.transforms.ToTensor()]
transformer = gn.data.vision.transforms.Compose(transformer)
mnist_train = gn.data.vision.FashionMNIST( train=True)
mnist_test = gn.data.vision.FashionMNIST( train=False)
train_iter = gn.data.DataLoader(
mnist_train.transform_first(transformer), batch_size, shuffle=True)
test_iter = gn.data.DataLoader(
mnist_test.transform_first(transformer), batch_size, shuffle=False)
return train_iter, test_iter
batch_size=128
train_iter,test_iter=load_data_fashion_mnist(batch_size)
cross_loss = gn.loss.SoftmaxCrossEntropyLoss()
# 定义准确率
def accuracy(output, label):
return nd.mean(output.argmax(axis=1) == label).asscalar()
def evaluate_accuracy(data_iter, net): # 定义测试集准确率
acc = 0
for data, label in data_iter:
data,label = data.as_in_context(ctx),label.as_in_context(ctx)
label = label.astype('float32')
output = net(data)
acc += accuracy(output, label)
return acc / len(data_iter)
# 梯度下降优化器
def SGD(params, lr):
for pa in params:
pa[:] = pa - lr * pa.grad # 参数沿着梯度的反方向走特定距离
# 训练
lr = 0.2
epochs = 5
for epoch in range(epochs):
train_loss = 0
train_acc = 0
for image, y in train_iter:
image,y = image.as_in_context(ctx),y.as_in_context(ctx)
y = y.astype('float32')
with ag.record():
output = cnn_net(image,is_training=True)
loss = cross_loss(output, y)
loss.backward()
# 将梯度做平均,这样学习率不会对batch_size那么敏感
SGD(params, lr / batch_size)
train_loss += nd.mean(loss).asscalar()
train_acc += accuracy(output, y)
test_acc = evaluate_accuracy(test_iter, cnn_net)
print("Epoch %d, Loss:%f, Train acc:%f, Test acc:%f"
% (epoch, train_loss / len(train_iter), train_acc / len(train_iter), test_acc))
训练结果:
这里就不继续跑下去了,机器太渣…
上一篇: 移动端Vue项目适配处理
下一篇: SparkStreaming常用算子