欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

TensorFlow keras实现unet网络并进行图像分割入门实例(非常适合新手!)

程序员文章站 2022-04-08 08:52:27
...

最近在学习图像分割方面内容,做了个实例在这儿给大家分享。(不喜请一定喷,多多评论,互相学习)

这个实例用到了kaggle的一个数据集,是来自Carvana Image Masking Challenge这个比赛,下面提供官网下载地址和百度网盘下载地址

官网:https://www.kaggle.com/c/carvana-image-masking-challenge/data
官网需要注册才能下载
网盘:https://pan.baidu.com/s/1EHSt0ANwSI8x67j_h8ZYjw
提取码:f2c4
(若涉及侵权请联系我删除)

原理

首先来看看原理:其实原理也没什么好讲的,百度一大堆,咱们还是直接进入正题吧[手动狗头]
TensorFlow keras实现unet网络并进行图像分割入门实例(非常适合新手!)

unet实现

创建一个文件用来写模型(model.py)

from tensorflow_core.python.keras import Input, Model
from tensorflow_core.python.keras.layers import Conv2D, MaxPooling2D, Dropout, UpSampling2D, concatenate
from tensorflow_core.python.keras.optimizer_v2.adam import Adam


def unet(pretrained_weights=None, input_size=(512 , 512, 3)):
    inputs = Input(input_size)
    conv1 = Conv2D(64, 3, activation='relu', padding='same', kernel_initializer='he_normal')(inputs)
    conv1 = Conv2D(64, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv1)
    pool1 = MaxPooling2D(pool_size=(2, 2))(conv1)
    conv2 = Conv2D(128, 3, activation='relu', padding='same', kernel_initializer='he_normal')(pool1)
    conv2 = Conv2D(128, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv2)
    pool2 = MaxPooling2D(pool_size=(2, 2))(conv2)
    conv3 = Conv2D(256, 3, activation='relu', padding='same', kernel_initializer='he_normal')(pool2)
    conv3 = Conv2D(256, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv3)
    pool3 = MaxPooling2D(pool_size=(2, 2))(conv3)
    conv4 = Conv2D(512, 3, activation='relu', padding='same', kernel_initializer='he_normal')(pool3)
    conv4 = Conv2D(512, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv4)
    drop4 = Dropout(0.5)(conv4)     # 将部分隐藏层神经元丢弃,防止过于细化而引起的过拟合情况
    pool4 = MaxPooling2D(pool_size=(2, 2))(drop4)

    conv5 = Conv2D(1024, 3, activation='relu', padding='same', kernel_initializer='he_normal')(pool4)
    conv5 = Conv2D(1024, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv5)
    drop5 = Dropout(0.5)(conv5)

    up6 = Conv2D(512, 2, activation='relu', padding='same', kernel_initializer='he_normal')(
        UpSampling2D(size=(2, 2))(drop5))
    merge6 = concatenate([drop4, up6], axis=3)  # axis=1代表列合并, axis=2代表行合并,axis=3代表层合并
    conv6 = Conv2D(512, 3, activation='relu', padding='same', kernel_initializer='he_normal')(merge6)
    conv6 = Conv2D(512, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv6)

    up7 = Conv2D(256, 2, activation='relu', padding='same', kernel_initializer='he_normal')(
        UpSampling2D(size=(2, 2))(conv6))
    merge7 = concatenate([conv3, up7], axis=3)
    conv7 = Conv2D(256, 3, activation='relu', padding='same', kernel_initializer='he_normal')(merge7)
    conv7 = Conv2D(256, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv7)

    up8 = Conv2D(128, 2, activation='relu', padding='same', kernel_initializer='he_normal')(
        UpSampling2D(size=(2, 2))(conv7))
    merge8 = concatenate([conv2, up8], axis=3)
    conv8 = Conv2D(128, 3, activation='relu', padding='same', kernel_initializer='he_normal')(merge8)
    conv8 = Conv2D(128, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv8)

    up9 = Conv2D(64, 2, activation='relu', padding='same', kernel_initializer='he_normal')(
        UpSampling2D(size=(2, 2))(conv8))
    merge9 = concatenate([conv1, up9], axis=3)
    conv9 = Conv2D(64, 3, activation='relu', padding='same', kernel_initializer='he_normal')(merge9)
    conv9 = Conv2D(64, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv9)
    conv9 = Conv2D(2, 3, activation='relu', padding='same', kernel_initializer='he_normal')(conv9)
    conv10 = Conv2D(3, 1, activation='sigmoid')(conv9)

    print(inputs.shape, conv10.shape)
    model = Model(inputs, conv10)

    model.compile(optimizer=Adam(lr=1e-6), loss='binary_crossentropy', metrics=['accuracy'])

    # model.summary()

    if (pretrained_weights):
        model.load_weights(pretrained_weights)

    return model

模型实现后就是如何读取数据集的问题了

数据集处理

由于我们的数据集比较大,所以不能一次性读入,我们需要实现一个generator用在训练器需要的时候产生读取硬盘的数据并和标签一起传给训练器
这里我们创建一个myGenerator.py的文件,然后在文件中放入下面的几个函数
首先是读取照片的函数

import cv2

def cv_imread(filePath):
    cv_img = cv2.imdecode(np.fromfile(filePath, dtype=np.uint8), -1)
    return cv_img

然后是加载训练数据的函数(这个就是generator
我这里用了csv来指定训练集和标签的一对一关系 csv里面存的是图片的地址,大概就像这样
TensorFlow keras实现unet网络并进行图像分割入门实例(非常适合新手!)
由于训练器很贪婪,需要不断的读入数据,所以我们的generator也要不断产生数据,不然就会报下标越界的错,所以我们用while True来满足它!

训练器不会一次只处理一张图,一般一次都是一批(这样有利于收敛)所以我们的generator也要一次性返回一批数据。这个数据我们从刚刚读取的所有数据中随机选。至于选多少张就看传入函数的参数batch_size的大小了。

现在选起来的还只是图片的路径,训练器需要的是已经读取的数据,所以我们需要遍历刚刚选的那个数组,利用上文写的那个cv_imread函数进行读取,最后打包到image_data_arraylabel_data_array 并用yield返回

下面括号内的文字用来解释yield,懂的同学直接pass。
(yield实际和return的作用差不多,都是返回数据,前者是调用的人需要的时候返回一点然后进入循环等待下次需要再返回一点,后者是只能返回一次)(换句话说就是,嵌套在for循环中return之后就不再执行下去了;而嵌套在for循环里面的yield的返回之后,只要调用的人还需要数据,yield就能不断让程序循环下去并返回数据)(如果不是循环的话,yield和return差不多一样)

import csv
import json
import os
import numpy as np
import cv2
from matplotlib import pyplot

def load_train(csvDir, width, height, batch_size):
    fx = 0.0
    fy = 0.0
    # 处理列表得到数组
    images_path = []
    labels_path = []
    # 利用csv.reader读取csv文件,然后将返回的值转化为列表
    # 然后就可以得到x(训练集)、y(标签)的地址
    csvFile = open(csvDir, "r")
    reader = csv.reader(csvFile)
    content = list(reader)
    for item in content:
        images_path.append(item[0])
        labels_path.append(item[1])
    # 进入循环读取照片
    while True:
    	# 下面定义两个数组来装每个批次(batch_size)的数据
        image_data_array = []
        label_data_array = []
        # 随机选一组数据
        index_group = np.random.randint(0, len(images_path), batch_size)
        # print("batch_size:", str(index_group))
        for index in index_group:
            image = images_path[index]
            label = labels_path[index]

            image_data = cv_imread(image)
            # 这里需要resize一下图片的长宽,让长宽与模型接收的长宽一致
            image_data = cv2.resize(image_data, (width, height), fx=fx, fy=fy, interpolation=cv2.INTER_CUBIC)
            image_data = image_data.astype(np.float32)
            image_data = np.multiply(image_data, 1.0 / 255.0)
            image_data_array.append(image_data)

            label_data = cv_imread(label)
            # label_data = cv2.cvtColor(label_data, cv2.COLOR_GRAY2BGR) # 颜色转化
            label_data = cv2.resize(label_data, (width, height), fx=fx, fy=fy, interpolation=cv2.INTER_CUBIC)
            label_data = label_data.astype(np.float32)
            label_data = np.multiply(label_data, 1.0 / 255.0)
            label_data_array.append(label_data)

        image_data_r = np.array(image_data_array)
        label_data_r = np.array(label_data_array)

        yield image_data_r, label_data_r

和上面的思路一样,我们还需要写一个加载测试集的函数(当然这一步可以不要,但是这样你就看不到val_lossval_accuracy两个数据了)

加载测试集的函数只需要一次性把所有测试数据返回就行了,不用使用yield 来一批一批的返回

这里就不赘述了,直接放代码,哪里看不懂评论区见!

def load_test(csvDir, width, height, batch_size):
    fx = 0.0
    fy = 0.0
    # 处理列表得到数组
    images_path = []
    labels_path = []
    csvFile = open(csvDir, "r")
    reader = csv.reader(csvFile)
    content = list(reader)
    for item in content:
        images_path.append(item[0])
        labels_path.append(item[1])
    # 进入循环读取照片

    # for image, label in zip(images_path, labels_path):
    image_data_array = []
    label_data_array = []
    index_group = np.random.randint(0, len(images_path), batch_size)
    # print("batch_size:", str(index_group))
    for index in index_group:
        image = images_path[index]
        label = labels_path[index]

        image_data = cv_imread(image)
        image_data = cv2.resize(image_data, (width, height), fx=fx, fy=fy, interpolation=cv2.INTER_CUBIC)
        image_data = image_data.astype(np.float32)
        image_data = np.multiply(image_data, 1.0 / 255.0)
        image_data_array.append(image_data)

        label_data = cv_imread(label)
        # label_data = cv2.cvtColor(label_data, cv2.COLOR_GRAY2BGR) # 颜色转化
        label_data = cv2.resize(label_data, (width, height), fx=fx, fy=fy, interpolation=cv2.INTER_CUBIC)
        label_data = label_data.astype(np.float32)
        label_data = np.multiply(label_data, 1.0 / 255.0)
        label_data_array.append(label_data)

    image_data_r = np.array(image_data_array)
    label_data_r = np.array(label_data_array)

    return image_data_r, label_data_r

OK,数据的读入已经解决了,接下来就是写主函数跑数据了,激动吗!!
TensorFlow keras实现unet网络并进行图像分割入门实例(非常适合新手!)
TensorFlow keras实现unet网络并进行图像分割入门实例(非常适合新手!)

主函数

from model import *
import matplotlib.pyplot as plt

from myGenerator import load_train, load_test
model = unet()
# 这里调用load_train(csvDir, width, height, batch_size)产生数据
# 如果内存小batch_size就设为1吧
history = model.fit_generator(load_train(r"E:\train_data\carChallenge\mycsv.csv", 512, 512, 4), workers=1,
                              steps_per_epoch=2, epochs=4,
                              validation_data=load_test(r"E:\train_data\carChallenge\test_data.csv", 512, 512, 100)
                              )


model.save('modelWithWeight.h5')
model.save_weights('fine_tune_model_weight')

# print(history.history)

# 展示一下精确度的随训练的变化图
plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('model accuracy')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

# 展示一下loss随训练的变化图
plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('model loss')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='upper left')
plt.show()

预测

TensorFlow keras实现unet网络并进行图像分割入门实例(非常适合新手!)
TensorFlow keras实现unet网络并进行图像分割入门实例(非常适合新手!)
总体来说效果还不错
那我们来预测一下,下面我已经整理了预测相关的方法,直接用就OK

import random
from tensorflow_core.python.keras.models import load_model
import numpy as np
import cv2

# # 加载模型h5文件
from MatteMatting import MatteMatting

class ArrayEmpty(Exception):
    def __str__(self):
        return "预测列表为空,请使用Predict.add()往列表添加地址"


class Predict():
    def __init__(self, model_path, show_summary=False):
        self.item_list = []
        self.model = load_model(model_path)
        if show_summary:
            self.model.summary()

    def add(self, path):
        """
        :param path: 预测图片列表地址
        """
        self.item_list.append(path)

    def predict_all(self, model_in_size, original_size):
        """
        预测一组数据,并返回值
        :param model_out_size: 模型的输入尺寸(width,height)
        :param original_size: 图片原始尺寸。程序会自动将尺寸还原为这个尺寸(width,height)
        :return:迭代器返回生成结果
        """
        if len(self.item_list):
            for item in self.item_list:
                dc = self.predict_one(item, model_in_size, original_size)
                yield dc
        else:
            raise ArrayEmpty()

    def predict_one(self, path, model_in_size, original_size):
        """
        预测一个数据,并返回值
        :param path: 需要预测的数据
        :param model_out_size: 模型的输入尺寸(width,height)
        :param original_size: 图片原始尺寸。程序会自动将尺寸还原为这个尺寸(width,height)
        :return:
        """
        src = [path]
        get = self.__read_file(model_in_size, src=src)
        predict = self.model.predict(get)
        ii = 0
        dc = cv2.resize(predict[ii, :, :, :], original_size)  # 后面这个参数是形状恢复为原来的形状
        return dc

    @staticmethod
    def __read_file(size_tuple, src=[]):
        """
        规范化图片大小和像素值
        :param size_tuple: 图片大小,要求为元组(width,height)
        :param src:连接列表
        :return:返回预测图片列表
        """
        pre_x = []
        for s in src:
            print(s)
            input = cv2.imread(s)
            input = cv2.resize(input, size_tuple)
            input = cv2.cvtColor(input, cv2.COLOR_BGR2RGB)
            pre_x.append(input)  # input一张图片
        pre_x = np.array(pre_x) / 255.0
        return pre_x

主函数

if __name__ == '__main__':
    import os
    
    data_path_array = []
    # 需要预测的图片的文件夹
    resDir = r'E:\train_data\carChallenge\train_hq'
    # 将需要预测的图片地址存入数组
    for root, dirs, files in os.walk(resDir):
        for file in files:
            data_path_array.append(os.path.join(root, file))
    print(data_path_array)
	# 这里使用我整理的那个类,实例化的时候先把训练得到的模型权重文件放进去
    pd = Predict("h5/modelWithWeight.h5")
    # 然后调用我整理的那个类类里面的add方法把需要预测的地址添加进去
    for item in data_path_array:
        pd.add(item)
    # 调用里面的predict_all方法,返回的是一个生成器,需要我们用next来读取
    dd = pd.predict_all((512, 512), (1918, 1280))
    # 保存到指定位置
    for item in data_path_array:
        dc = next(dd)
        dc = (dc * 255).astype(np.uint8)  # 把dtype从float32转化为uint8
        item = cv2.imread(item)
        # dc.dtype='uint8'
        mm = MatteMatting(item, dc, input_type='cv2')
        mm.save_image(r"E:\train_data\carChallenge\data_out\{}.png".format(str(random.randint(1, 10000000))), mask_flip=True)
        

结果展示

输入
TensorFlow keras实现unet网络并进行图像分割入门实例(非常适合新手!)
输出
TensorFlow keras实现unet网络并进行图像分割入门实例(非常适合新手!)
接下我做的是把车抠下来,python抠图教程可以看我另一篇文章
https://blog.csdn.net/qq_29391809/article/details/106036745