神经风格迁移(Neural Style Transfer)程序实现(Caffe)
前言
上次的博客写了神经风格迁移(Neural Style Transfer)程序实现(Keras),使用keras的一个好处就是api简单,能够快速部署模型,使用很方便。出于学习目的,这次又使用caffe实现了一遍,整体思路跟前面的差不多,就不多说了。详细可以参考论文:一个艺术风格化的神经网络算法(A Neural Algorithm of Artistic Style)(译)。
程序
不说废话了,直接上代码。
log.py
# *_*coding:utf-8 *_*
# author: 许鸿斌
# 邮箱:aaa@qq.com
import logging
import sys
# 获取logger实例,如果参数为空则返回root logger
logger = logging.getLogger('Test')
# 指定logger输出格式
LOG_FORMAT = "%(filename)s:%(funcName)s:%(asctime)s.%(msecs)03d -- %(message)s"
# formatter = logging.Formatter('%(asctime)s %(levelname)-8s: %(message)s')
formatter = logging.Formatter(LOG_FORMAT)
# 文件日志
# file_handler = logging.FileHandler("test.log")
# file_handler.setFormatter(formatter) # 可以通过setFormatter指定输出格式
# 控制台日志
console_handler = logging.StreamHandler(sys.stdout)
console_handler.formatter = formatter # 也可以直接给formatter赋值
# 为logger添加的日志处理器
# logger.addHandler(file_handler)
logger.addHandler(console_handler)
# 指定日志的最低输出级别,默认为WARN级别
logger.setLevel(logging.INFO)
style_transfer.py
# *_*coding:utf-8 *_*
# author: 许鸿斌
# 邮箱:aaa@qq.com
# 日志模块
from log import logger
# 导入库
import argparse
import os
import sys
import timeit
import logging
# 导入caffe
caffe_root = '/home/xhb/caffe/caffe'
pycaffe_root = os.path.join(caffe_root, 'python')
sys.path.append(pycaffe_root)
import caffe
import numpy as np
import progressbar as pb
from scipy.fftpack import ifftn
from scipy.linalg.blas import sgemm
from scipy.misc import imsave
from scipy.optimize import minimize
from skimage import img_as_ubyte
from skimage.transform import rescale
# numeric constants
INF = np.float32(np.inf)
STYLE_SCALE = 1.2
# 几个CNN框架:VGG19、VGG16、GOOGLENET、CAFFENET
# 定义了从特定层上取出特征谱作为内容输出或者风格输出
# 默认会使用VGG16
VGG19_WEIGHTS = {"content": {"conv4_2": 1},
"style": {"conv1_1": 0.2,
"conv2_1": 0.2,
"conv3_1": 0.2,
"conv4_1": 0.2,
"conv5_1": 0.2}}
VGG16_WEIGHTS = {"content": {"conv4_2": 1},
"style": {"conv1_1": 0.2,
"conv2_1": 0.2,
"conv3_1": 0.2,
"conv4_1": 0.2,
"conv5_1": 0.2}}
GOOGLENET_WEIGHTS = {"content": {"conv2/3x3": 2e-4,
"inception_3a/output": 1-2e-4},
"style": {"conv1/7x7_s2": 0.2,
"conv2/3x3": 0.2,
"inception_3a/output": 0.2,
"inception_4a/output": 0.2,
"inception_5a/output": 0.2}}
CAFFENET_WEIGHTS = {"content": {"conv4": 1},
"style": {"conv1": 0.2,
"conv2": 0.2,
"conv3": 0.2,
"conv4": 0.2,
"conv5": 0.2}}
# argparse
parser = argparse.ArgumentParser(description='Neural Style Transfer', usage='xxx.py -s <style.image> -c <content_image>')
parser.add_argument('-s', '--style_img', type=str, required=True, help='Style (art) image')
parser.add_argument('-c', '--content_img', type=str, required=True, help='Content image')
parser.add_argument('-g', '--gpu_id', default=-1, type=int, required=False, help='GPU device number')
parser.add_argument('-m', '--model', default='vgg16', type=str, required=False, help='Which model to use')
parser.add_argument('-i', '--init', default='content', type=str, required=False, help='initialization strategy')
parser.add_argument("-r", "--ratio", default="1e4", type=str, required=False, help="style-to-content ratio")
parser.add_argument("-n", "--num-iters", default=512, type=int, required=False, help="L-BFGS iterations")
parser.add_argument("-l", "--length", default=512, type=float, required=False, help="maximum image length")
parser.add_argument("-v", "--verbose", action="store_true", required=False, help="print minimization outputs")
parser.add_argument("-o", "--output", default=None, required=False, help="output path")
def _compute_style_grad(F, G, G_style, layer):
"""
Computes style gradient and loss from activation features.
"""
# compute loss and gradient
(Fl, Gl) = (F[layer], G[layer])
c = Fl.shape[0]**-2 * Fl.shape[1]**-2
El = Gl - G_style[layer]
loss = c/4 * (El**2).sum()
grad = c * sgemm(1.0, El, Fl) * (Fl>0)
return loss, grad
def _compute_content_grad(F, F_content, layer):
"""
Computes content gradient and loss from activation features.
"""
# compute loss and gradient
Fl = F[layer]
El = Fl - F_content[layer]
loss = (El**2).sum() / 2
grad = El * (Fl>0)
return loss, grad
def _compute_reprs(net_in, net, layers_style, layers_content, gram_scale=1):
"""
Computes representation matrices for an image.
"""
# input data and forward pass
(repr_s, repr_c) = ({}, {})
net.blobs["data"].data[0] = net_in
net.forward()
# loop through combined set of layers
for layer in set(layers_style)|set(layers_content):
F = net.blobs[layer].data[0].copy()
F.shape = (F.shape[0], -1)
repr_c[layer] = F
if layer in layers_style:
repr_s[layer] = sgemm(gram_scale, F, F.T)
return repr_s, repr_c
def style_optfn(x, net, weights, layers, reprs, ratio):
"""
Style transfer optimization callback for scipy.optimize.minimize().
:param numpy.ndarray x:
Flattened data array.
:param caffe.Net net:
Network to use to generate gradients.
:param dict weights:
Weights to use in the network.
:param list layers:
Layers to use in the network.
:param tuple reprs:
Representation matrices packed in a tuple.
:param float ratio:
Style-to-content ratio.
"""
# 更新参数
layers_style = weights["style"].keys() # 风格对应的层
layers_content = weights["content"].keys() # 内容对应的层
net_in = x.reshape(net.blobs["data"].data.shape[1:])
# 计算风格和内容表示
(G_style, F_content) = reprs
(G, F) = _compute_reprs(net_in, net, layers_style, layers_content)
# 反向传播
loss = 0
net.blobs[layers[-1]].diff[:] = 0
for i, layer in enumerate(reversed(layers)):
next_layer = None if i == len(layers)-1 else layers[-i-2]
grad = net.blobs[layer].diff[0]
# 风格部分
if layer in layers_style:
wl = weights["style"][layer]
(l, g) = _compute_style_grad(F, G, G_style, layer)
loss += wl * l * ratio
grad += wl * g.reshape(grad.shape) * ratio
# 内容部分
if layer in layers_content:
wl = weights["content"][layer]
(l, g) = _compute_content_grad(F, F_content, layer)
loss += wl * l
grad += wl * g.reshape(grad.shape)
# compute gradient
net.backward(start=layer, end=next_layer)
if next_layer is None:
grad = net.blobs["data"].diff[0]
else:
grad = net.blobs[next_layer].diff[0]
# format gradient for minimize() function
grad = grad.flatten().astype(np.float64)
return loss, grad
class StyleTransfer(object):
"""
Style transfer class.
"""
def __init__(self, model_name, use_pbar=True):
"""
Initialize the model used for style transfer.
:param str model_name:
Model to use.
:param bool use_pbar:
Use progressbar flag.
"""
style_path = os.path.abspath(os.path.split(__file__)[0])
base_path = os.path.join(style_path, "models", model_name)
# 导入各模型的结构文件、预训练权重;均值文件为ImageNet数据集图片的均值,训练时减去;
# vgg19
if model_name == 'vgg19':
model_file = os.path.join(base_path, 'VGG_ILSVRC_19_layers_deploy.prototxt')
pretrained_file = os.path.join(base_path, 'VGG_ILSVRC_19_layers.caffemodel')
mean_file = os.path.join(base_path, 'ilsvrc_2012_mean.npy')
weights = VGG19_WEIGHTS
# vgg16
elif model_name == 'vgg16':
model_file = os.path.join(base_path, 'VGG_ILSVRC_16_layers_deploy.prototxt')
pretrained_file = os.path.join(base_path, 'VGG_ILSVRC_16_layers.caffemodel')
mean_file = os.path.join(base_path, 'ilsvrc_2012_mean.npy')
weights = VGG16_WEIGHTS
# googlenet
elif model_name == 'googlenet':
model_file = os.path.join(base_path, 'deploy.prototxt')
pretrained_file = os.path.join(base_path, 'bvlc_googlenet.caffemodel')
mean_file = os.path.join(base_path, 'ilsvrc_2012_mean.npy')
weights = GOOGLENET_WEIGHTS
# caffenet
elif model_name == 'caffenet':
model_file = os.path.join(base_path, 'deploy.prototxt')
pretrained_file = os.path.join(base_path, 'bvlc_reference_caffenet.caffemodel')
mean_file = os.path.join(base_path, 'ilsvrc_2012_mean.npy')
weights = CAFFENET_WEIGHTS
else:
assert False, 'Model not available'
# 添加模型和权重
self.load_model(model_file, pretrained_file, mean_file)
self.weights = weights
# 找出属于'style'和'content'的层,存放在layers列表中
self.layers = []
for layer in self.net.blobs:
if layer in self.weights['style'] or layer in self.weights['content']:
self.layers.append(layer)
self.use_pbar = use_pbar
# 设置回调函数
if self.use_pbar:
def callback(xk):
self.grad_iter += 1
try:
self.pbar.update(self.grad_iter)
except:
self.pbar.finished = True
if self._callback is not None:
net_in = xk.reshape(self.net.blobs['data'].data.shape[1:])
self._callback(self.transformer.deprocess('data', net_in))
else:
def callback(xk):
if self._callback is not None:
net_in = xk.reshape(self.net.blobs['data'].data.shape[1:])
self._callback(self.transformer.deprocess('data', net_in))
self.callback = callback
def load_model(self, model_file, pretrained_file, mean_file):
"""
Loads specified model from caffe install (see caffe docs).
:param str model_file:
Path to model protobuf.
:param str pretrained_file:
Path to pretrained caffe model.
:param str mean_file:
Path to mean file.
"""
# caffe中导入网络
# 抑制了在控制台打印的输出,也就是去掉了caffe自己默认会打印的那一堆信息
null_fds = os.open(os.devnull, os.O_RDWR)
out_orig = os.dup(2)
os.dup2(null_fds, 2)
net = caffe.Net(str(model_file), str(pretrained_file), caffe.TEST) # 导入模型
os.dup2(out_orig, 2)
os.close(null_fds)
# 配置输入数据格式
transformer = caffe.io.Transformer({'data': net.blobs['data'].data.shape})
transformer.set_mean('data', np.load(mean_file).mean(1).mean(1)) # 求均值
transformer.set_channel_swap('data', (2, 1, 0))
transformer.set_transpose('data', (2, 0, 1))
transformer.set_raw_scale('data', 255)
self.net = net
self.transformer = transformer
def get_generated(self):
"""
Saves the generated image (net input, after optimization).
:param str path:
Output path.
"""
data = self.net.blobs["data"].data
img_out = self.transformer.deprocess('data', data)
return img_out
def _rescale_net(self, img):
"""
Rescales the network to fit a particular image.
"""
# get new dimensions and rescale net + transformer
new_dims = (1, img.shape[2]) + img.shape[:2]
self.net.blobs["data"].reshape(*new_dims)
self.transformer.inputs["data"] = new_dims
def _make_noise_input(self, init):
"""
Creates an initial input (generated) image.
"""
# specify dimensions and create grid in Fourier domain
dims = tuple(self.net.blobs["data"].data.shape[2:]) + \
(self.net.blobs["data"].data.shape[1], ) # (height, width, channels)
grid = np.mgrid[0:dims[0], 0:dims[1]]
# create frequency representation for pink noise
Sf = (grid[0] - (dims[0]-1)/2.0) ** 2 + \
(grid[1] - (dims[1]-1)/2.0) ** 2
Sf[np.where(Sf == 0)] = 1
Sf = np.sqrt(Sf)
Sf = np.dstack((Sf**int(init),)*dims[2])
# apply ifft to create pink noise and normalize
ifft_kernel = np.cos(2*np.pi*np.random.randn(*dims)) + \
1j*np.sin(2*np.pi*np.random.randn(*dims))
img_noise = np.abs(ifftn(Sf * ifft_kernel))
img_noise -= img_noise.min()
img_noise /= img_noise.max()
# preprocess the pink noise image
x0 = self.transformer.preprocess("data", img_noise)
return x0
def _create_pbar(self, max_iter):
"""
Creates a progress bar.
"""
self.grad_iter = 0
self.pbar = pb.ProgressBar()
self.pbar.widgets = ["Optimizing: ", pb.Percentage(),
" ", pb.Bar(marker=pb.AnimatedMarker()),
" ", pb.ETA()]
self.pbar.maxval = max_iter
def transfer_style(self, img_style, img_content, length=512, ratio=1e5,
n_iter=512, init="-1", verbose=False, callback=None):
"""
Transfers the style of the artwork to the input image.
:param numpy.ndarray img_style:
A style image with the desired target style.
:param numpy.ndarray img_content:
A content image in floating point, RGB format.
:param function callback:
A callback function, which takes images at iterations.
"""
# 求出'data'层的宽和高较小的一个
orig_dim = min(self.net.blobs["data"].shape[2:])
# 调整图像尺寸
scale = max(length / float(max(img_style.shape[:2])),
orig_dim / float(min(img_style.shape[:2])))
img_style = rescale(img_style, STYLE_SCALE*scale)
scale = max(length / float(max(img_content.shape[:2])),
orig_dim / float(min(img_content.shape[:2])))
img_content = rescale(img_content, scale)
self._rescale_net(img_style) # 调整风格图像尺寸,设为输入
layers = self.weights["style"].keys() # 取出风格表示所对应的特定层的名字,存在layers里面
net_in = self.transformer.preprocess("data", img_style) # 对风格图像预处理,处理成'data'层可接受的格式
gram_scale = float(img_content.size)/img_style.size # gram矩阵的维度
# 计算风格表示
G_style = _compute_reprs(net_in, self.net, layers, [],
gram_scale=1)[0]
self._rescale_net(img_content) # 调整内容图像尺寸,设为输入
layers = self.weights["content"].keys() # 取出内容表示所对应的特定层的名字,存在layers里面
net_in = self.transformer.preprocess("data", img_content) # 对内容图像预处理,处理成'data'层可接受的格式
# 计算内容表示
F_content = _compute_reprs(net_in, self.net, [], layers)[1]
# 初始化网络输入
# 如果是numpy数组,则视作图像,直接将其作为输入;
# 如果是"content",则将内容图像作为图像输入;
# 如果是"mixed",则将其内容图像与风格图像乘以一定权重输入;
# 其他情况,随机初始化噪声作为输入。
if isinstance(init, np.ndarray):
img0 = self.transformer.preprocess("data", init)
elif init == "content":
img0 = self.transformer.preprocess("data", img_content)
elif init == "mixed":
img0 = 0.95*self.transformer.preprocess("data", img_content) + \
0.05*self.transformer.preprocess("data", img_style)
else:
img0 = self._make_noise_input(init)
# compute data bounds
data_min = -self.transformer.mean["data"][:,0,0]
data_max = data_min + self.transformer.raw_scale["data"]
data_bounds = [(data_min[0], data_max[0])] * int(img0.size / 3) + \
[(data_min[1], data_max[1])] * int(img0.size / 3) + \
[(data_min[2], data_max[2])] * int(img0.size / 3)
# 优化问题相关参数
grad_method = "L-BFGS-B"
reprs = (G_style, F_content)
minfn_args = {
"args": (self.net, self.weights, self.layers, reprs, ratio),
"method": grad_method, "jac": True, "bounds": data_bounds,
"options": {"maxcor": 8, "maxiter": n_iter, "disp": verbose}
}
# 求解优化问题
self._callback = callback
minfn_args["callback"] = self.callback
if self.use_pbar and not verbose:
self._create_pbar(n_iter)
self.pbar.start()
res = minimize(style_optfn, img0.flatten(), **minfn_args).nit
self.pbar.finish()
else:
res = minimize(style_optfn, img0.flatten(), **minfn_args).nit
return res
def main(args):
# set level of logger
level = logging.INFO if args.verbose else logging.DEBUG
logger.setLevel(level)
logger.info('Starting style transfer.')
# 设置模式:CPU/GPU,默认CPU
if args.gpu_id == -1:
caffe.set_mode_cpu()
logger.info('Caffe setted on CPU.')
else:
caffe.set_device(args.gpu_id)
caffe.set_mode_gpu()
logger.info('Caffe setted on GPU {}'.format(args.gpu_id))
# 导入图像
style_img = caffe.io.load_image(args.style_img)
content_img = caffe.io.load_image(args.content_img)
logger.info('Successfully loaded images.')
# artistic style class
use_pbar = not args.verbose
st = StyleTransfer(args.model.lower(), use_pbar=use_pbar)
logging.info("Successfully loaded model {0}.".format(args.model))
# 调用style transfer函数
start = timeit.default_timer()
n_iters = st.transfer_style(style_img, content_img, length=args.length,
init=args.init, ratio=np.float(args.ratio),
n_iter=args.num_iters, verbose=args.verbose)
end = timeit.default_timer()
logging.info("Ran {0} iterations in {1:.0f}s.".format(n_iters, end-start))
img_out = st.get_generated()
# 生成图片输出路径
if args.output is not None:
out_path = args.output
else:
out_path_fmt = (os.path.splitext(os.path.split(args.content_img)[1])[0],
os.path.splitext(os.path.split(args.style_img)[1])[0],
args.model, args.init, args.ratio, args.num_iters)
out_path = "outputs/{0}-{1}-{2}-{3}-{4}-{5}.jpg".format(*out_path_fmt)
# 保存生成的艺术风格图片
imsave(out_path, img_as_ubyte(img_out))
logging.info("Output saved to {0}.".format(out_path))
if __name__ == '__main__':
args = parser.parse_args()
main(args)
补充说明
还有几点补充说明的:
caffe路径
一定要编译好pycaffe,目录指定到caffe的根目录。
# 导入caffe
caffe_root = '/home/xhb/caffe/caffe' # 自行修改caffe的根目录
pycaffe_root = os.path.join(caffe_root, 'python')
sys.path.append(pycaffe_root)
import caffe
模型文件
因为会用到在ImageNet下预训练好的模型文件,统一保存在models目录中。
我会把百度云链接放在最后,自行下载即可。
图片
随便找一些测试图像即可,但是注意要放到能找到的路径里。
网上随便找的一些图片:
内容图片
风格图片
最后生成的艺术风格图片
运行脚本
python style_transfer.py -s 风格图片路径 -c 内容图片路径
还有其他参数可以配置,一般用默认值就足够了。
后记
仅作学习交流用,如有事请私信。如果有的博文评论不了,请不要把评论发在不相干的地方,请直接私信。重要的事情说两遍!(o´ω`o)
完整工程:
链接:https://pan.baidu.com/s/1O11yEuAn4vRdBUMXW8djkQ 密码:sto6
由于caffemodel文件较大,所以里面没有把caffemodel放进去,需要自行下载。
预训练权重文件:
googlenet:http://dl.caffe.berkeleyvision.org/bvlc_googlenet.caffemodel
alexnet:http://dl.caffe.berkeleyvision.org/bvlc_reference_caffenet.caffemodel
vgg16:http://www.robots.ox.ac.uk/~vgg/software/very_deep/caffe/VGG_ILSVRC_16_layers.caffemodel
vgg19:http://www.robots.ox.ac.uk/~vgg/software/very_deep/caffe/VGG_ILSVRC_19_layers.caffemodel