欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

神经风格迁移(Neural Style Transfer)程序实现(Caffe)

程序员文章站 2024-03-20 23:15:16
...

前言

上次的博客写了神经风格迁移(Neural Style Transfer)程序实现(Keras),使用keras的一个好处就是api简单,能够快速部署模型,使用很方便。出于学习目的,这次又使用caffe实现了一遍,整体思路跟前面的差不多,就不多说了。详细可以参考论文:一个艺术风格化的神经网络算法(A Neural Algorithm of Artistic Style)(译)

程序

不说废话了,直接上代码。

log.py

# *_*coding:utf-8 *_*
# author: 许鸿斌
# 邮箱:aaa@qq.com

import logging
import sys

# 获取logger实例,如果参数为空则返回root logger
logger = logging.getLogger('Test')
# 指定logger输出格式
LOG_FORMAT = "%(filename)s:%(funcName)s:%(asctime)s.%(msecs)03d -- %(message)s"
# formatter = logging.Formatter('%(asctime)s %(levelname)-8s: %(message)s')
formatter = logging.Formatter(LOG_FORMAT)
# 文件日志
# file_handler = logging.FileHandler("test.log")
# file_handler.setFormatter(formatter)  # 可以通过setFormatter指定输出格式
# 控制台日志
console_handler = logging.StreamHandler(sys.stdout)
console_handler.formatter = formatter  # 也可以直接给formatter赋值
# 为logger添加的日志处理器
# logger.addHandler(file_handler)
logger.addHandler(console_handler)
# 指定日志的最低输出级别,默认为WARN级别
logger.setLevel(logging.INFO)

style_transfer.py

# *_*coding:utf-8 *_*
# author: 许鸿斌
# 邮箱:aaa@qq.com

# 日志模块
from log import logger

# 导入库
import argparse
import os
import sys
import timeit
import logging

# 导入caffe
caffe_root = '/home/xhb/caffe/caffe'
pycaffe_root = os.path.join(caffe_root, 'python')
sys.path.append(pycaffe_root)
import caffe

import numpy as np 
import progressbar as pb 
from scipy.fftpack import ifftn
from scipy.linalg.blas import sgemm
from scipy.misc import imsave
from scipy.optimize import minimize
from skimage import img_as_ubyte
from skimage.transform import rescale

# numeric constants
INF = np.float32(np.inf)
STYLE_SCALE = 1.2

# 几个CNN框架:VGG19、VGG16、GOOGLENET、CAFFENET
# 定义了从特定层上取出特征谱作为内容输出或者风格输出
# 默认会使用VGG16
VGG19_WEIGHTS = {"content": {"conv4_2": 1},
                 "style": {"conv1_1": 0.2,
                           "conv2_1": 0.2,
                           "conv3_1": 0.2,
                           "conv4_1": 0.2,
                           "conv5_1": 0.2}}
VGG16_WEIGHTS = {"content": {"conv4_2": 1},
                 "style": {"conv1_1": 0.2,
                           "conv2_1": 0.2,
                           "conv3_1": 0.2,
                           "conv4_1": 0.2,
                           "conv5_1": 0.2}}
GOOGLENET_WEIGHTS = {"content": {"conv2/3x3": 2e-4,
                                 "inception_3a/output": 1-2e-4},
                     "style": {"conv1/7x7_s2": 0.2,
                               "conv2/3x3": 0.2,
                               "inception_3a/output": 0.2,
                               "inception_4a/output": 0.2,
                               "inception_5a/output": 0.2}}
CAFFENET_WEIGHTS = {"content": {"conv4": 1},
                    "style": {"conv1": 0.2,
                              "conv2": 0.2,
                              "conv3": 0.2,
                              "conv4": 0.2,
                              "conv5": 0.2}}

# argparse
parser = argparse.ArgumentParser(description='Neural Style Transfer', usage='xxx.py -s <style.image> -c <content_image>')
parser.add_argument('-s', '--style_img', type=str, required=True, help='Style (art) image')
parser.add_argument('-c', '--content_img', type=str, required=True, help='Content image')
parser.add_argument('-g', '--gpu_id', default=-1, type=int, required=False, help='GPU device number')
parser.add_argument('-m', '--model', default='vgg16', type=str, required=False, help='Which model to use')
parser.add_argument('-i', '--init', default='content', type=str, required=False, help='initialization strategy')
parser.add_argument("-r", "--ratio", default="1e4", type=str, required=False, help="style-to-content ratio")
parser.add_argument("-n", "--num-iters", default=512, type=int, required=False, help="L-BFGS iterations")
parser.add_argument("-l", "--length", default=512, type=float, required=False, help="maximum image length")
parser.add_argument("-v", "--verbose", action="store_true", required=False, help="print minimization outputs")
parser.add_argument("-o", "--output", default=None, required=False, help="output path")

def _compute_style_grad(F, G, G_style, layer):
    """
        Computes style gradient and loss from activation features.
    """

    # compute loss and gradient
    (Fl, Gl) = (F[layer], G[layer])
    c = Fl.shape[0]**-2 * Fl.shape[1]**-2
    El = Gl - G_style[layer]
    loss = c/4 * (El**2).sum()
    grad = c * sgemm(1.0, El, Fl) * (Fl>0)

    return loss, grad

def _compute_content_grad(F, F_content, layer):
    """
        Computes content gradient and loss from activation features.
    """

    # compute loss and gradient
    Fl = F[layer]
    El = Fl - F_content[layer]
    loss = (El**2).sum() / 2
    grad = El * (Fl>0)

    return loss, grad

def _compute_reprs(net_in, net, layers_style, layers_content, gram_scale=1):
    """
        Computes representation matrices for an image.
    """

    # input data and forward pass
    (repr_s, repr_c) = ({}, {})
    net.blobs["data"].data[0] = net_in
    net.forward()

    # loop through combined set of layers
    for layer in set(layers_style)|set(layers_content):
        F = net.blobs[layer].data[0].copy()
        F.shape = (F.shape[0], -1)
        repr_c[layer] = F
        if layer in layers_style:
            repr_s[layer] = sgemm(gram_scale, F, F.T)

    return repr_s, repr_c

def style_optfn(x, net, weights, layers, reprs, ratio):
    """
        Style transfer optimization callback for scipy.optimize.minimize().

        :param numpy.ndarray x:
            Flattened data array.

        :param caffe.Net net:
            Network to use to generate gradients.

        :param dict weights:
            Weights to use in the network.

        :param list layers:
            Layers to use in the network.

        :param tuple reprs:
            Representation matrices packed in a tuple.

        :param float ratio:
            Style-to-content ratio.
    """

    # 更新参数
    layers_style = weights["style"].keys()  # 风格对应的层
    layers_content = weights["content"].keys()  # 内容对应的层
    net_in = x.reshape(net.blobs["data"].data.shape[1:])

    # 计算风格和内容表示
    (G_style, F_content) = reprs
    (G, F) = _compute_reprs(net_in, net, layers_style, layers_content)

    # 反向传播
    loss = 0
    net.blobs[layers[-1]].diff[:] = 0
    for i, layer in enumerate(reversed(layers)):
        next_layer = None if i == len(layers)-1 else layers[-i-2]
        grad = net.blobs[layer].diff[0]

        # 风格部分
        if layer in layers_style:
            wl = weights["style"][layer]
            (l, g) = _compute_style_grad(F, G, G_style, layer)
            loss += wl * l * ratio
            grad += wl * g.reshape(grad.shape) * ratio

        # 内容部分
        if layer in layers_content:
            wl = weights["content"][layer]
            (l, g) = _compute_content_grad(F, F_content, layer)
            loss += wl * l
            grad += wl * g.reshape(grad.shape)

        # compute gradient
        net.backward(start=layer, end=next_layer)
        if next_layer is None:
            grad = net.blobs["data"].diff[0]
        else:
            grad = net.blobs[next_layer].diff[0]

    # format gradient for minimize() function
    grad = grad.flatten().astype(np.float64)

    return loss, grad

class StyleTransfer(object):
    """
        Style transfer class.
    """
    def __init__(self, model_name, use_pbar=True):
        """
            Initialize the model used for style transfer.

            :param str model_name:
                Model to use.

            :param bool use_pbar:
                Use progressbar flag.
        """
        style_path = os.path.abspath(os.path.split(__file__)[0])
        base_path = os.path.join(style_path, "models", model_name)

        # 导入各模型的结构文件、预训练权重;均值文件为ImageNet数据集图片的均值,训练时减去;
        # vgg19
        if model_name == 'vgg19':
            model_file = os.path.join(base_path, 'VGG_ILSVRC_19_layers_deploy.prototxt')
            pretrained_file = os.path.join(base_path, 'VGG_ILSVRC_19_layers.caffemodel')
            mean_file = os.path.join(base_path, 'ilsvrc_2012_mean.npy')
            weights = VGG19_WEIGHTS
        # vgg16
        elif model_name == 'vgg16':
            model_file = os.path.join(base_path, 'VGG_ILSVRC_16_layers_deploy.prototxt')
            pretrained_file = os.path.join(base_path, 'VGG_ILSVRC_16_layers.caffemodel')
            mean_file = os.path.join(base_path, 'ilsvrc_2012_mean.npy')
            weights = VGG16_WEIGHTS
        # googlenet
        elif model_name == 'googlenet':
            model_file = os.path.join(base_path, 'deploy.prototxt')
            pretrained_file = os.path.join(base_path, 'bvlc_googlenet.caffemodel')
            mean_file = os.path.join(base_path, 'ilsvrc_2012_mean.npy')
            weights = GOOGLENET_WEIGHTS
        # caffenet
        elif model_name == 'caffenet':
            model_file = os.path.join(base_path, 'deploy.prototxt')
            pretrained_file = os.path.join(base_path, 'bvlc_reference_caffenet.caffemodel')
            mean_file = os.path.join(base_path, 'ilsvrc_2012_mean.npy')
            weights = CAFFENET_WEIGHTS
        else:
            assert False, 'Model not available'

        # 添加模型和权重
        self.load_model(model_file, pretrained_file, mean_file)
        self.weights = weights
        # 找出属于'style'和'content'的层,存放在layers列表中
        self.layers = []
        for layer in self.net.blobs:
            if layer in self.weights['style'] or layer in self.weights['content']:
                self.layers.append(layer)
        self.use_pbar = use_pbar

        # 设置回调函数
        if self.use_pbar:
            def callback(xk):
                self.grad_iter += 1
                try:
                    self.pbar.update(self.grad_iter)
                except:
                    self.pbar.finished = True
                if self._callback is not None:
                    net_in = xk.reshape(self.net.blobs['data'].data.shape[1:])
                    self._callback(self.transformer.deprocess('data', net_in))
        else:
            def callback(xk):
                if self._callback is not None:
                    net_in = xk.reshape(self.net.blobs['data'].data.shape[1:])
                    self._callback(self.transformer.deprocess('data', net_in))
        self.callback = callback

    def load_model(self, model_file, pretrained_file, mean_file):
        """
            Loads specified model from caffe install (see caffe docs).

            :param str model_file:
                Path to model protobuf.

            :param str pretrained_file:
                Path to pretrained caffe model.

            :param str mean_file:
                Path to mean file.
        """
        # caffe中导入网络
        # 抑制了在控制台打印的输出,也就是去掉了caffe自己默认会打印的那一堆信息
        null_fds = os.open(os.devnull, os.O_RDWR)
        out_orig = os.dup(2)
        os.dup2(null_fds, 2)
        net = caffe.Net(str(model_file), str(pretrained_file), caffe.TEST)  # 导入模型
        os.dup2(out_orig, 2)
        os.close(null_fds)

        # 配置输入数据格式
        transformer = caffe.io.Transformer({'data': net.blobs['data'].data.shape})
        transformer.set_mean('data', np.load(mean_file).mean(1).mean(1))    # 求均值
        transformer.set_channel_swap('data', (2, 1, 0)) 
        transformer.set_transpose('data', (2, 0, 1)) 
        transformer.set_raw_scale('data', 255)

        self.net = net
        self.transformer = transformer

    def get_generated(self):
        """
            Saves the generated image (net input, after optimization).

            :param str path:
                Output path.
        """
        data = self.net.blobs["data"].data
        img_out = self.transformer.deprocess('data', data)
        return img_out

    def _rescale_net(self, img):
        """
            Rescales the network to fit a particular image.
        """
        # get new dimensions and rescale net + transformer
        new_dims = (1, img.shape[2]) + img.shape[:2]
        self.net.blobs["data"].reshape(*new_dims)
        self.transformer.inputs["data"] = new_dims

    def _make_noise_input(self, init):
        """
            Creates an initial input (generated) image.
        """
        # specify dimensions and create grid in Fourier domain
        dims = tuple(self.net.blobs["data"].data.shape[2:]) + \
               (self.net.blobs["data"].data.shape[1], )  # (height, width, channels)
        grid = np.mgrid[0:dims[0], 0:dims[1]]

        # create frequency representation for pink noise
        Sf = (grid[0] - (dims[0]-1)/2.0) ** 2 + \
             (grid[1] - (dims[1]-1)/2.0) ** 2
        Sf[np.where(Sf == 0)] = 1
        Sf = np.sqrt(Sf)
        Sf = np.dstack((Sf**int(init),)*dims[2])

        # apply ifft to create pink noise and normalize
        ifft_kernel = np.cos(2*np.pi*np.random.randn(*dims)) + \
                      1j*np.sin(2*np.pi*np.random.randn(*dims))
        img_noise = np.abs(ifftn(Sf * ifft_kernel))
        img_noise -= img_noise.min()
        img_noise /= img_noise.max()

        # preprocess the pink noise image
        x0 = self.transformer.preprocess("data", img_noise)

        return x0

    def _create_pbar(self, max_iter):
        """
            Creates a progress bar.
        """
        self.grad_iter = 0
        self.pbar = pb.ProgressBar()
        self.pbar.widgets = ["Optimizing: ", pb.Percentage(), 
                             " ", pb.Bar(marker=pb.AnimatedMarker()),
                             " ", pb.ETA()]
        self.pbar.maxval = max_iter

    def transfer_style(self, img_style, img_content, length=512, ratio=1e5,
                       n_iter=512, init="-1", verbose=False, callback=None):
        """
            Transfers the style of the artwork to the input image.

            :param numpy.ndarray img_style:
                A style image with the desired target style.

            :param numpy.ndarray img_content:
                A content image in floating point, RGB format.

            :param function callback:
                A callback function, which takes images at iterations.
        """

        # 求出'data'层的宽和高较小的一个
        orig_dim = min(self.net.blobs["data"].shape[2:])

        # 调整图像尺寸
        scale = max(length / float(max(img_style.shape[:2])),
                    orig_dim / float(min(img_style.shape[:2])))
        img_style = rescale(img_style, STYLE_SCALE*scale)
        scale = max(length / float(max(img_content.shape[:2])),
                    orig_dim / float(min(img_content.shape[:2])))
        img_content = rescale(img_content, scale)


        self._rescale_net(img_style)    # 调整风格图像尺寸,设为输入
        layers = self.weights["style"].keys()   # 取出风格表示所对应的特定层的名字,存在layers里面
        net_in = self.transformer.preprocess("data", img_style) # 对风格图像预处理,处理成'data'层可接受的格式
        gram_scale = float(img_content.size)/img_style.size # gram矩阵的维度
        # 计算风格表示
        G_style = _compute_reprs(net_in, self.net, layers, [],
                                 gram_scale=1)[0]


        self._rescale_net(img_content)  # 调整内容图像尺寸,设为输入
        layers = self.weights["content"].keys() # 取出内容表示所对应的特定层的名字,存在layers里面
        net_in = self.transformer.preprocess("data", img_content)   # 对内容图像预处理,处理成'data'层可接受的格式
        # 计算内容表示
        F_content = _compute_reprs(net_in, self.net, [], layers)[1]

        # 初始化网络输入
        # 如果是numpy数组,则视作图像,直接将其作为输入;
        # 如果是"content",则将内容图像作为图像输入;
        # 如果是"mixed",则将其内容图像与风格图像乘以一定权重输入;
        # 其他情况,随机初始化噪声作为输入。
        if isinstance(init, np.ndarray):
            img0 = self.transformer.preprocess("data", init)
        elif init == "content":
            img0 = self.transformer.preprocess("data", img_content)
        elif init == "mixed":
            img0 = 0.95*self.transformer.preprocess("data", img_content) + \
                   0.05*self.transformer.preprocess("data", img_style)
        else:
            img0 = self._make_noise_input(init)

        # compute data bounds
        data_min = -self.transformer.mean["data"][:,0,0]
        data_max = data_min + self.transformer.raw_scale["data"]
        data_bounds = [(data_min[0], data_max[0])] * int(img0.size / 3) + \
                      [(data_min[1], data_max[1])] * int(img0.size / 3) + \
                      [(data_min[2], data_max[2])] * int(img0.size / 3)

        # 优化问题相关参数
        grad_method = "L-BFGS-B"
        reprs = (G_style, F_content)
        minfn_args = {
            "args": (self.net, self.weights, self.layers, reprs, ratio),
            "method": grad_method, "jac": True, "bounds": data_bounds,
            "options": {"maxcor": 8, "maxiter": n_iter, "disp": verbose}
        }

        # 求解优化问题
        self._callback = callback
        minfn_args["callback"] = self.callback
        if self.use_pbar and not verbose:
            self._create_pbar(n_iter)
            self.pbar.start()
            res = minimize(style_optfn, img0.flatten(), **minfn_args).nit
            self.pbar.finish()
        else:
            res = minimize(style_optfn, img0.flatten(), **minfn_args).nit

        return res

def main(args):
    # set level of logger
    level = logging.INFO if args.verbose else logging.DEBUG
    logger.setLevel(level)
    logger.info('Starting style transfer.')

    # 设置模式:CPU/GPU,默认CPU
    if args.gpu_id == -1:
        caffe.set_mode_cpu()
        logger.info('Caffe setted on CPU.')
    else:
        caffe.set_device(args.gpu_id)
        caffe.set_mode_gpu()
        logger.info('Caffe setted on GPU {}'.format(args.gpu_id))

    # 导入图像
    style_img = caffe.io.load_image(args.style_img)
    content_img = caffe.io.load_image(args.content_img)
    logger.info('Successfully loaded images.')

    # artistic style class
    use_pbar = not args.verbose
    st = StyleTransfer(args.model.lower(), use_pbar=use_pbar)
    logging.info("Successfully loaded model {0}.".format(args.model))

    # 调用style transfer函数
    start = timeit.default_timer()
    n_iters = st.transfer_style(style_img, content_img, length=args.length, 
                                init=args.init, ratio=np.float(args.ratio), 
                                n_iter=args.num_iters, verbose=args.verbose)
    end = timeit.default_timer()
    logging.info("Ran {0} iterations in {1:.0f}s.".format(n_iters, end-start))
    img_out = st.get_generated()

    # 生成图片输出路径
    if args.output is not None:
        out_path = args.output
    else:
        out_path_fmt = (os.path.splitext(os.path.split(args.content_img)[1])[0], 
                        os.path.splitext(os.path.split(args.style_img)[1])[0], 
                        args.model, args.init, args.ratio, args.num_iters)
        out_path = "outputs/{0}-{1}-{2}-{3}-{4}-{5}.jpg".format(*out_path_fmt)

    # 保存生成的艺术风格图片
    imsave(out_path, img_as_ubyte(img_out))
    logging.info("Output saved to {0}.".format(out_path))

if __name__ == '__main__':
    args = parser.parse_args()
    main(args)

补充说明

还有几点补充说明的:

caffe路径

一定要编译好pycaffe,目录指定到caffe的根目录。

# 导入caffe
caffe_root = '/home/xhb/caffe/caffe' # 自行修改caffe的根目录
pycaffe_root = os.path.join(caffe_root, 'python')
sys.path.append(pycaffe_root)
import caffe

模型文件

因为会用到在ImageNet下预训练好的模型文件,统一保存在models目录中。
神经风格迁移(Neural Style Transfer)程序实现(Caffe)
我会把百度云链接放在最后,自行下载即可。

图片

随便找一些测试图像即可,但是注意要放到能找到的路径里。
网上随便找的一些图片:
内容图片
神经风格迁移(Neural Style Transfer)程序实现(Caffe)
风格图片
神经风格迁移(Neural Style Transfer)程序实现(Caffe)
最后生成的艺术风格图片
神经风格迁移(Neural Style Transfer)程序实现(Caffe)

运行脚本

python style_transfer.py -s 风格图片路径 -c 内容图片路径

还有其他参数可以配置,一般用默认值就足够了。

后记

仅作学习交流用,如有事请私信。如果有的博文评论不了,请不要把评论发在不相干的地方,请直接私信。重要的事情说两遍!(o´ω`o)

完整工程:
链接:https://pan.baidu.com/s/1O11yEuAn4vRdBUMXW8djkQ 密码:sto6
由于caffemodel文件较大,所以里面没有把caffemodel放进去,需要自行下载。
预训练权重文件:
googlenet:http://dl.caffe.berkeleyvision.org/bvlc_googlenet.caffemodel
alexnet:http://dl.caffe.berkeleyvision.org/bvlc_reference_caffenet.caffemodel
vgg16:http://www.robots.ox.ac.uk/~vgg/software/very_deep/caffe/VGG_ILSVRC_16_layers.caffemodel
vgg19:http://www.robots.ox.ac.uk/~vgg/software/very_deep/caffe/VGG_ILSVRC_19_layers.caffemodel