欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

使用tensorflow2.0 完成MTCNN网络 第一步。

程序员文章站 2022-05-04 13:13:08
1.MTCNN 的优点及必须要了解基础点。MTCNN 的 “MT”是指多任务学习(Multi-Task),在同一个任务中同时学习“识别人脸”、“边框回归”,“人脸关键点识别”。多尺度问题一直是困扰检测准确性的一个难点。MTCNN使用图像金字塔来解决目标多尺度问题。(图像金字塔百度上介绍非常多,我这里不过多叙述)。P-NET的网络模型时用单尺度(12X12)的图片训练出来的,想要识别各种尺度的人脸更准确,需要把待识别的人脸尺度先按照一定的比例,多次等比例缩放(缩一次识别一次,最后缩到接近12x12)...

1.MTCNN 的优点及必须要了解基础点。

MTCNN 的 “MT”是指多任务学习(Multi-Task),在同一个任务中同时学习“识别人脸”、“边框回归”,“人脸关键点识别”。
多尺度问题一直是困扰检测准确性的一个难点。MTCNN使用图像金字塔来解决目标多尺度问题。(图像金字塔百度上介绍非常多,我这里不过多叙述)。

  1. P-NET的网络模型时用单尺度(12X12)的图片训练出来的,想要识别各种尺度的人脸更准确,需要把待识别的人脸尺度先按照一定的比例,多次等比例缩放(缩一次识别一次,最后缩到接近12x12)
  2. 缺点是非常慢,生成图片金字塔慢,每种尺度的图片都需要喂入模型中,相当于执行了多次模型推断流程。
  3. MTCNN算法可以接受任意尺度的图片。第一阶段的P-NET是一个全卷积网络,卷积,池化、非线性激活都是可以接受任意尺度矩阵的运算,**但全连接运算是需要规定输入。**则输入的图片尺度需要固定,如果没有全连接层,图片尺度可以是任意的,(当然有例外:有即包含全连接层也能接受任意尺度的图片结构【Pyramid Pooling 空间金字塔池化】可以百度)
  4. 设置适合的最小人脸尺度和缩放因子可以优化计算效率,官方经验是0.709。minsize 是指你认为图片中需要识别人脸的最小尺度,factor是指每次对边缩放的倍数。P-NET预测阶段会多次缩放原图得到图片金字塔,目的是为了让缩放后的图片中的人脸与P-NET训练时候的图片尺度(12px * 12px)接近,先把原图等比例缩放 “【12 / minsize】” 。即 (原图大小 x【12 / minsize】)缩放一次 ,再按factor 用上一次的缩放结果不断缩放,直到最短边小于或等于12,推断出 minsize 越大,生成的“金字塔”层数越少,resize和pnet的计算量越小。
  5. 在输入模型前对图片每个像素做(x - 127.5)/ 128 的操作。 此操作可以使图片像素归一化,加快收敛熟读,由于图片每个像素点是 [0-255] 的数,且都是非负数,加入此操作,可以把 [0-255] 映射为(-1,1)。有正有负的输入,收敛速度更快,训练需要此操作,预测时也需要此操作。
  6. 边框回归我们会在代码中体现,这里不多做叙述。

2.下面我们开始进入代码模式。这里的数据集我会上传、可在底部下载。

  1. 进行利用脚本获取P_net训练集。(12px * 12px)大小的图片。neg、pos、part、、

gen_data_pent.py

import sys
import numpy as np
import cv2
import os
import numpy.random as npr

stdsize = 12

anno_file = "label.txt"
# im_dir = "samples"
pos_save_dir = str(stdsize) + "/positive"
part_save_dir = str(stdsize) + "/part"
neg_save_dir = str(stdsize) + '/negative'
save_dir = "12"

def IoU(pr_box, boxes):
    """Compute IoU between detect box and gt boxes

    Parameters:
    ----------
    box: numpy array , shape (5, ): x1, y1, x2, y2, score
        input box
    boxes: numpy array, shape (n, 4): x1, y1, x2, y2
        input ground truth boxes

    Returns:
    -------
    ovr: numpy.array, shape (n, )
        IoU
    """
    # print("随机锚框:",pr_box)

    box_area = (pr_box[2] - pr_box[0] + 1) * (pr_box[3] - pr_box[1] + 1)
    # print("随机面积box_area:",box_area)
    # print("(boxes[:, 2] - boxes[:, 0] + 1):",(boxes[:, 2] - boxes[:, 0] + 1))
    #XML真实区域 X2-X1 +1 = W   Y2-Y1 = H   W*H
    area = (boxes[:, 2] - boxes[:, 0] + 1) * (boxes[:, 3] - boxes[:, 1] + 1)
    # print("真实面积area:",area)
    # print("probx[0]",pr_box[0])
    # boxes[:, 0]代表取boxes这个nx4矩阵所有行的第一列数据
    xx1 = np.maximum(pr_box[0], boxes[:, 0])
    # print("xx1",xx1)
    yy1 = np.maximum(pr_box[1], boxes[:, 1])
    # print("yy1",yy1)
    xx2 = np.minimum(pr_box[2], boxes[:, 2])
    # print("xx2",xx2)
    yy2 = np.minimum(pr_box[3], boxes[:, 3])
    # print("yy2",yy2)
    # compute the width and height of the bounding box
    # print("xx2-xx1",(xx2-xx1))
    w = np.maximum(0, xx2 - xx1 + 1)
    h = np.maximum(0, yy2 - yy1 + 1)
    # inter_area = (xx1 - xx2 + 1) * (yy1 - yy2 + 1)
    # w = np.max(xx1,yy1)

    inter = w * h
    # print("inter",inter_area)
    ovr = inter / (box_area + area - inter)
    print("IOU:",ovr)
    return ovr

# 生成一系列文件夹用于存储三类样本
def mkr(dr):
    if not os.path.exists(dr):
        os.mkdir(dr)

mkr(save_dir)
mkr(pos_save_dir)
mkr(part_save_dir)
mkr(neg_save_dir)

# 生成一系列txt文档用于存储Positive,Negative,Part三类数据的信息
f1 = open(os.path.join(save_dir, 'pos_' + str(stdsize) + '.txt'), 'w')
f2 = open(os.path.join(save_dir, 'neg_' + str(stdsize) + '.txt'), 'w')
f3 = open(os.path.join(save_dir, 'part_' + str(stdsize) + '.txt'), 'w')

# 读取label.txt
with open(anno_file, 'r') as f:
    annotations = f.readlines()
    del annotations[0]
num = len(annotations)
print("%d pics in total" % num)
p_idx = 0 # positive
n_idx = 0 # negative
d_idx = 0 # dont care
idx = 0
box_idx = 0

for annotation in annotations:
    # print("annotation",annotation)
    annotation = annotation.strip().split(' ')

    im_path = annotation[0]

    bbox = list(map(float,annotation[1:]))

    boxes = np.array(bbox, dtype=np.float32).reshape(-1, 4)
    boxes[:, 2] += boxes[:, 0] - 1
    boxes[:, 3] += boxes[:, 1] - 1
    # print("boxes",boxes)
    img = cv2.imread(im_path)
    # print(img.shape)
    idx += 1
    if idx % 100 == 0:
        print(idx, "images done")

    height, width, channel = img.shape
    print(img.shape)
    neg_num = 0
    while neg_num < 50:
        # 生成随机数,对每张数据集中的图像进行切割,生成一系列小的图像
        size = npr.randint(stdsize, min(width, height) / 2)

        nx = npr.randint(0, width - size)

        ny = npr.randint(0, height - size)
        crop_box = np.array([nx, ny, nx + size, ny + size])
        # print(crop_box)
        # print("boxes",boxes)
        # 计算小的图像与标注产生的检测框之间的IoU
        Iou = IoU(crop_box, boxes)
        # print(Iou)
        cropped_im = img[ny : ny + size, nx : nx + size, :]
        resized_im = cv2.resize(cropped_im, (stdsize, stdsize), interpolation=cv2.INTER_LINEAR)

        if np.max(Iou) < 0.3:
            # Iou with all gts must below 0.3
            save_file = os.path.join(neg_save_dir, "%s.jpg"%n_idx)
            f2.write(str(stdsize)+"/negative/%s"%n_idx + ' 0\n')
            cv2.imwrite(save_file, resized_im)
            n_idx += 1
            neg_num += 1


    for box in boxes:
        print(box)
        # box (x_left, y_top, x_right, y_bottom)
        x1, y1, x2, y2 = box
        w = x2 - x1 + 1
        h = y2 - y1 + 1

        # max(w, h) < 40:参数40表示忽略的最小的脸的大小
        # in case the ground truth boxes of small faces are not accurate
        if max(w, h) < 20 or x1 < 0 or y1 < 0:
            continue
        # 生成与gt有重叠的反面例子
        for i in range(5):
            size = npr.randint(stdsize, min(width, height) / 2)
            # delta_x and delta_y are offsets of (x1, y1)
            delta_x = npr.randint(max(-size, -x1), w)
            delta_y = npr.randint(max(-size, -y1), h)
            nx1 = int(max(0, x1 + delta_x))

            ny1 = int(max(0, y1 + delta_y))
            if nx1 + size > width or ny1 + size > height:
                continue
            crop_box = np.array([nx1, ny1, nx1 + size, ny1 + size])
            Iou = IoU(crop_box, boxes)
            # cropped_im = img[ny: ny + size, nx: nx + size, :]
            cropped_im = img[ny1 : ny1 + size, nx1 : nx1 + size, :]
            resized_im = cv2.resize(cropped_im, (stdsize, stdsize), interpolation=cv2.INTER_LINEAR)

            if np.max(Iou) < 0.3:
                # Iou with all gts must below 0.3
                save_file = os.path.join(neg_save_dir, "%s.jpg" % n_idx)
                f2.write(str(stdsize)+"/negative/%s" % n_idx + ' 0\n')
                cv2.imwrite(save_file, resized_im)
                n_idx += 1



        # generate positive examples and part faces
        for i in range(20):

            size = npr.randint(int(min(w, h) * 0.8), np.ceil(1.25 * max(w, h)))

            # delta here is the offset of box center
            delta_x = npr.randint(-w * 0.2, w * 0.2)

            delta_y = npr.randint(-h * 0.2, h * 0.2)

            nx1 = max(x1 + w / 2 + delta_x - size / 2, 0)

            ny1 = max(y1 + h / 2 + delta_y - size / 2, 0)
            nx2 = nx1 + size

            ny2 = ny1 + size

            if nx2 > width or ny2 > height:
                continue
            crop_box = np.array([nx1, ny1, nx2, ny2])

            offset_x1 = (x1 - nx1) / float(size)
            offset_y1 = (y1 - ny1) / float(size)
            offset_x2 = (x2 - nx2) / float(size)
            offset_y2 = (y2 - ny2) / float(size)

            cropped_im = img[int(ny1):int(ny2), int(nx1):int(nx2), :]

            resized_im = cv2.resize(cropped_im, (stdsize, stdsize), interpolation=cv2.INTER_LINEAR)

            box_ = box.reshape(1, -1)

            if IoU(crop_box, box_) >= 0.65:
                save_file = os.path.join(pos_save_dir, "%s.jpg"%p_idx)
                f1.write(str(stdsize)+"/positive/%s"%p_idx + ' 1 %.2f %.2f %.2f %.2f\n'%(offset_x1, offset_y1, offset_x2, offset_y2))
                cv2.imwrite(save_file, resized_im)
                p_idx += 1
            elif IoU(crop_box, box_) >= 0.4:
                save_file = os.path.join(part_save_dir, "%s.jpg"%d_idx)
                f3.write(str(stdsize)+"/part/%s"%d_idx + ' -1 %.2f %.2f %.2f %.2f\n'%(offset_x1, offset_y1, offset_x2, offset_y2))
                cv2.imwrite(save_file, resized_im)
                d_idx += 1
        box_idx += 1
        print("%s images done, pos: %s part: %s neg: %s"%(idx, p_idx, d_idx, n_idx))

f1.close()
f2.close()
f3.close()

执行完后,会出现下图中的 名称为“12”的文件夹、里面所包含的东西。我不多叙述。上一篇博客里有写。
使用tensorflow2.0 完成MTCNN网络 第一步。

  1. 我们要把 三类txt 文本进行合并。以便制作训练集。

writ-labe.py

import sys
import os

save_dir = "12"
if not os.path.exists(save_dir):
    os.mkdir(save_dir)
f1 = open(os.path.join(save_dir, 'pos_%s.txt'%(save_dir)), 'r')
f2 = open(os.path.join(save_dir, 'neg_%s.txt'%(save_dir)), 'r')
f3 = open(os.path.join(save_dir, 'part_%s.txt'%(save_dir)), 'r')

pos = f1.readlines()
neg = f2.readlines()
part = f3.readlines()
f = open(os.path.join(save_dir, 'label-train%s.txt'%(save_dir)), 'w')

for i in range(int(len(pos))):
    p = pos[i].find(" ") + 1
    pos[i] = pos[i][:p-1] + ".jpg " + pos[i][p:-1] + "\n"
    f.write(pos[i])

for i in range(int(len(neg))):
    p = neg[i].find(" ") + 1
    neg[i] = neg[i][:p-1] + ".jpg " + neg[i][p:-1] + " -1 -1 -1 -1\n"
    f.write(neg[i])

for i in range(int(len(part))):
    p = part[i].find(" ") + 1
    part[i] = part[i][:p-1] + ".jpg " + part[i][p:-1] + "\n"
    f.write(part[i])

f1.close()
f2.close()
f3.close()

看到这个label-train.txt 文件。这就是我们需要的训练集了。
使用tensorflow2.0 完成MTCNN网络 第一步。

  1. 但是tensorflow 2.0去直接训练 txt 格式,读取速度慢,导致训练速度停滞。为了提高读取速度,我将该txt格式转换成 tfrecord 格式。

gen_tfrecord.py

import os
import random
import sys

import tensorflow as tf
import cv2
from PIL import Image

def _int64_feature(value):
    """Wrapper for insert int64 feature into Example proto."""
    if not isinstance(value, list):
        value = [value]
    return tf.train.Feature(int64_list=tf.train.Int64List(value=value))


def _float_feature(value):
    """Wrapper for insert float features into Example proto."""
    if not isinstance(value, list):
        value = [value]
    return tf.train.Feature(float_list=tf.train.FloatList(value=value))


def _bytes_feature(value):
    """Wrapper for insert bytes features into Example proto."""
    if not isinstance(value, list):
        value = [value]
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=value))



def _process_image_withoutcoder(filename):
    """
    利用cv2将filename指向的图片tostring
    """

    image = cv2.imread(filename)

    # transform data into string format
    image_data = image.tostring()
    assert len(image.shape) == 3
    height = image.shape[0]
    width = image.shape[1]
    assert image.shape[2] == 3
    # return string data and initial height and width of the image
    return image_data, height, width


def _convert_to_example_simple(image_example, image_buffer):
    """
        covert to tfrecord file
    Parameter
    ------------
        image_example: dict, an image example
        image_buffer: string, JPEG encoding of RGB image
    Return
    -----------
        Example proto
    """
    class_label = image_example['label']

    bbox = image_example['bbox']
    roi = [bbox['xmin'], bbox['ymin'], bbox['xmax'], bbox['ymax']]
    # landmark = [bbox['xlefteye'],bbox['ylefteye'],bbox['xrighteye'],bbox['yrighteye'],bbox['xnose'],bbox['ynose'],
    #             bbox['xleftmouth'],bbox['yleftmouth'],bbox['xrightmouth'],bbox['yrightmouth']]

    example = tf.train.Example(features=tf.train.Features(feature={
        'image/encoded': _bytes_feature(image_buffer),
        'image/label': _int64_feature(class_label),
        'image/roi': _float_feature(roi),
        # 'image/landmark': _float_feature(landmark)
    }))
    return example


# 从图片和注释文件里加载数据并将其添加到TFRecord里
# 参数(变量):filename:存有数据的字典;tfrecord_writer:用来写入TFRecord的writer

def _add_to_tfrecord(filename, image_example, tfrecord_writer):
    # print('---', filename)

    # imaga_data:转化为字符串的图片
    # height:图片原始高度
    # width:图片原始宽度
    # image_example:包含图片信息的字典
    # print(filename)
    image_data, height, width = _process_image_withoutcoder(filename)
    example = _convert_to_example_simple(image_example, image_data)
    tfrecord_writer.write(example.SerializeToString())  # 将imaga_data转化到image_example中并写入tfrecord


def _get_output_filename(output_dir,net):
    # 定义一下输出的文件名

    # return '%s/%s_%s_%s.tfrecord' % (output_dir, name, net, st)
    # st = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    # time.strftime() 函数接收以时间元组,并返回以可读字符串表示的当地时间,格式由参数format决定:time.strftime(format[, t]),用来输出当前时间
    # 返回的是'../../DATA/imglists/PNet/train_PNet_landmark.tfrecord'
    return '%s/train_%s_landmark.tfrecord' % (output_dir,net)


def run(dataset_dir,net,output_dir,shuffling=False):
    """
    运行转换操作
    Args:
      dataset_dir: 数据集所在的数据集目录
      output_dir: 输出目录
    """

    # tfrecord name
    tf_filename = _get_output_filename(output_dir,net)  # '../../DATA/imglists/PNet/train_PNet_landmark.tfrecord'

    if tf.io.gfile.exists(tf_filename):  # tf.io.gfile模块提供了文件操作的API,包括文件的读取、写入、删除、复制等等
        print('Dataset files already exist. Exiting without re-creating them.')  # 判断是否存在同名文件
        return

    # 获得数据集,并打乱顺序
    dataset = get_dataset(dataset_dir)
    print(dataset)
    # filenames = dataset['filename']
    if shuffling:
        tf_filename = tf_filename + '_shuffle'
        # random.seed(12345454)
        random.shuffle(dataset)  # 打乱dataset数据集的顺序

    # Process dataset files.
    # write the data to tfrecord
    print('lala')
    with tf.io.TFRecordWriter(tf_filename) as tfrecord_writer:
        for i, image_example in enumerate(dataset):  # 读取dataset的索引和内容
            if (i + 1) % 1 == 0:
                sys.stdout.write('\r>> %d/%d images has been converted' % (
                i + 1, len(dataset)))  # 输出“x00/ len(dataset) images has been converted”
            sys.stdout.flush()  # 以一定间隔时间刷新输出
            filename = image_example['filename']  # 赋值
            _add_to_tfrecord(filename, image_example, tfrecord_writer)
    # 最后,编写标签文件
    # labels_to_class_names = dict(zip(range(len(_CLASS_NAMES)), _CLASS_NAMES))
    # dataset_utils.write_label_file(labels_to_class_names, dataset_dir)
    print('\nFinished converting the MTCNN dataset!')


def get_dataset(dir):
    # 获取文件名字,标签和注释
    item = 'label-train%s.txt'%(dir)

    dataset_dir = os.path.join(dir, item)  # dataset_dir = '../../DATA/imglists/PNet/train_PNet_landmark.txt'
    # print(dataset_dir)
    imagelist = open(dataset_dir, 'r')  # 以只读的形式打开train_PNet_landmark.txt,并传入imagelist里面

    dataset = []  # 新建列表
    for line in imagelist.readlines():  # 按行读取imagelist里面的内容
        info = line.strip().split(' ')  # .strip().split()去除每一行首尾空格并且以空格为分隔符读取内容到info里面
        data_example = dict()  # 新建字典
        bbox = dict()
        data_example['filename'] = info[0]  # filename=info[0]
        # print(data_example['filename'])
        data_example['label'] = int(info[1])  # label=info[1],info[1]的值有四种可能,1,0,-1,-2;分别对应着正、负、无关、关键点样本
        bbox['xmin'] = 0  # 初始化bounding box的值
        bbox['ymin'] = 0
        bbox['xmax'] = 0
        bbox['ymax'] = 0
        # bbox['xlefteye'] = 0  # 初始化人脸坐标的值
        # bbox['ylefteye'] = 0
        # bbox['xrighteye'] = 0
        # bbox['yrighteye'] = 0
        # bbox['xnose'] = 0
        # bbox['ynose'] = 0
        # bbox['xleftmouth'] = 0
        # bbox['yleftmouth'] = 0
        # bbox['xrightmouth'] = 0
        # bbox['yrightmouth'] = 0
        if len(info) == 6:  # 当info的长度等于6时,表示此时的info是正样本或者无关样本
            bbox['xmin'] = float(info[2])
            bbox['ymin'] = float(info[3])
            bbox['xmax'] = float(info[4])
            bbox['ymax'] = float(info[5])
        # if len(info) == 12:  # 当info的长度等于12时,表示此时的info是landmark样本
        #     bbox['xlefteye'] = float(info[2])
        #     bbox['ylefteye'] = float(info[3])
        #     bbox['xrighteye'] = float(info[4])
        #     bbox['yrighteye'] = float(info[5])
        #     bbox['xnose'] = float(info[6])
        #     bbox['ynose'] = float(info[7])
        #     bbox['xleftmouth'] = float(info[8])
        #     bbox['yleftmouth'] = float(info[9])
        #     bbox['xrightmouth'] = float(info[10])
        #     bbox['yrightmouth'] = float(info[11])

        data_example['bbox'] = bbox  # 将bounding box值传入字典
        dataset.append(data_example)  # 将data_example字典内容传入列表dataset

    return dataset  # 返回的是dataset,datase是个列表,但里面每个元素都是一个字典,每个字典都含有3个key,分别是filename、label和bounding box


if __name__ == '__main__':
    dir = '12'
    net = 'PNet'
    output_directory = '12'
    run(dir,net,output_directory,shuffling=True)

我画红圈的便是我们生成的tfrecord 格式文件。
使用tensorflow2.0 完成MTCNN网络 第一步。

  1. 我们对训练集进行了编码,那么我在读取该文件时就需要解码。我们编写解码函数。

read_tfrecord.py

import tensorflow as tf
import numpy as np



def image_color_distort(inputs):
    inputs = tf.image.random_contrast(inputs, lower=0.5, upper=1.5)
    inputs = tf.image.random_brightness(inputs, max_delta=0.2)
    inputs = tf.image.random_hue(inputs,max_delta= 0.2)
    inputs = tf.image.random_saturation(inputs,lower = 0.5, upper= 1.5)
    return inputs

def red_tf(imgs,net_size):
    raw_image_dataset = tf.data.TFRecordDataset(imgs).shuffle(1000)

    image_feature_description = {
        'image/encoded': tf.io.FixedLenFeature([], tf.string),
        'image/label': tf.io.FixedLenFeature([], tf.int64),
        'image/roi': tf.io.FixedLenFeature([4], tf.float32),
    }
    def _parse_image_function(example_proto):
      # Parse the input tf.Example proto using the dictionary above.
      return tf.io.parse_single_example(example_proto, image_feature_description)

    parsed_image_dataset = raw_image_dataset.map(_parse_image_function)
    print(parsed_image_dataset)
    image_batch = []
    label_batch = []
    bbox_batch = []

    for image_features in parsed_image_dataset:

        image_raw = tf.io.decode_raw(image_features['image/encoded'],tf.uint8)
        # 将值规划在[-1,1]内
        images = tf.reshape(image_raw, [net_size, net_size, 3])
        image = (tf.cast(images, tf.float32) - 127.5) / 128
        #图像变色
        image = image_color_distort(image)
        image_batch.append(image)

        label = tf.cast(image_features['image/label'], tf.float32)
        label_batch.append(label)

        roi = tf.cast(image_features['image/roi'], tf.float32)
        bbox_batch.append(roi)


    return image_batch,label_batch,bbox_batch

2.1我们开始进入编写P_NET、R_NET、O_NET 网络。

MTCNN_.py

import tensorflow.keras as keras
import tensorflow as tf
import numpy as np
import cv2



#处理的12X12网络
def Pnet():
    input = tf.keras.Input(shape=[None, None, 3])
    x = tf.keras.layers.Conv2D(10, (3, 3), name='conv1',kernel_regularizer=keras.regularizers.l2(0.0005))(input)
    x = tf.keras.layers.PReLU(tf.constant_initializer(0.25),shared_axes=[1, 2], name='PReLU1')(x)
    x = tf.keras.layers.MaxPooling2D((2, 2))(x)
    x = tf.keras.layers.Conv2D(16, (3, 3),name='conv2',kernel_regularizer=keras.regularizers.l2(0.0005))(x)
    x = tf.keras.layers.PReLU(tf.constant_initializer(0.25),shared_axes=[1, 2], name='PReLU2')(x)
    x = tf.keras.layers.Conv2D(32, (3, 3),name='conv3',kernel_regularizer=keras.regularizers.l2(0.0005))(x)
    x = tf.keras.layers.PReLU(tf.constant_initializer(0.25),shared_axes=[1, 2], name='PReLU3')(x)
    classifier = tf.keras.layers.Conv2D(2, (1, 1), activation='softmax',name='conv4-1')(x)
    cls_prob = tf.squeeze(classifier, [1, 2], name='cls_prob')
    bbox_regress = tf.keras.layers.Conv2D(4, (1, 1), name='conv4-2')(x)
    bbox_pred = tf.squeeze(bbox_regress,[1,2],name='bbox_pred')
    model = tf.keras.models.Model([input], [classifier, bbox_regress])
    return model

#处理的24X24网络
def Rnet():
    """定义RNet网络的架构"""
    input = tf.keras.Input(shape=[24, 24, 3])
    x = tf.keras.layers.Conv2D(28, (3, 3),strides=1,padding='valid',name='conv1')(input)
    x = tf.keras.layers.PReLU(shared_axes=[1, 2],name='prelu1')(x)
    x = tf.keras.layers.MaxPooling2D(pool_size=3,strides=2,padding='same')(x)
    x = tf.keras.layers.Conv2D(48, (3, 3),strides=1,padding='valid',name='conv2')(x)
    x = tf.keras.layers.PReLU(shared_axes=[1, 2],name='prelu2')(x)
    x = tf.keras.layers.MaxPooling2D(pool_size=3,strides=2)(x)
    x = tf.keras.layers.Conv2D(64, (2, 2),strides=1,padding='valid',name='conv3')(x)
    x = tf.keras.layers.PReLU(shared_axes=[1, 2],name='prelu3')(x)
    x = tf.keras.layers.Permute((3, 2, 1))(x)
    x = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dense(128, name='conv4')(x)
    x = tf.keras.layers.PReLU(name='prelu4')(x)
    classifier = tf.keras.layers.Dense(2,activation='softmax',name='conv5-1')(x)
    bbox_regress = tf.keras.layers.Dense(4, name='conv5-2')(x)
    model = tf.keras.models.Model([input], [classifier, bbox_regress])
    return model

#处理的48X48网络
def Onet():
    """定义ONet网络的架构"""
    input = tf.keras.layers.Input(shape=[48, 48, 3])
    # 48,48,3 -> 23,23,32
    x = tf.keras.layers.Conv2D(32, (3, 3),strides=1,padding='valid',name='conv1')(input)
    x = tf.keras.layers.PReLU(shared_axes=[1, 2],name='prelu1')(x)
    x = tf.keras.layers.MaxPool2D(pool_size=3,strides=2,padding='same')(x)
    # 23,23,32 -> 10,10,64
    x = tf.keras.layers.Conv2D(64, (3, 3),strides=1,padding='valid',name='conv2')(x)
    x = tf.keras.layers.PReLU(shared_axes=[1, 2],name='prelu2')(x)
    x = tf.keras.layers.MaxPool2D(pool_size=3,strides=2)(x)
    # 8,8,64 -> 4,4,64
    x = tf.keras.layers.Conv2D(64, (3, 3),strides=1,padding='valid',name='conv3')(x)
    x = tf.keras.layers.PReLU(shared_axes=[1, 2],name='prelu3')(x)
    x = tf.keras.layers.MaxPool2D(pool_size=2)(x)
    # 4,4,64 -> 3,3,128
    x = tf.keras.layers.Conv2D(128, (2, 2),strides=1,padding='valid',name='conv4')(x)
    x = tf.keras.layers.PReLU(shared_axes=[1, 2],name='prelu4')(x)
    # 3,3,128 -> 128,12,12
    x = tf.keras.layers.Permute((3, 2, 1))(x)
    # 1152 -> 256
    x = tf.keras.layers.Flatten()(x)
    x = tf.keras.layers.Dense(256, name='conv5')(x)
    x = tf.keras.layers.PReLU(name='prelu5')(x)
    # 鉴别
    # 256 -> 2 256 -> 4 256 -> 10
    classifier = tf.keras.layers.Dense(2,activation='softmax',name='conv6-1')(x)
    bbox_regress = tf.keras.layers.Dense(4, name='conv6-2')(x)
    landmark_regress = tf.keras.layers.Dense(10, name='conv6-3')(x)
    model = tf.keras.models.Model([input], [classifier, bbox_regress,landmark_regress])

    return model



#人脸分类损失函数
def cls_ohem(cls_prob, label):

    zeros = tf.zeros_like(label, dtype=tf.float32)
    # 若label中的值小于等于0,则为0,否则为1,就是把label中-1变为0
    label_filter_invalid = tf.where(tf.math.less(label,[0]),zeros,label)

    ## 类别size[2*batch]
    num_cls_prob = tf.size(cls_prob)

    #把cls_porob变成一维
    cls_prob_reshape = tf.reshape(cls_prob,[num_cls_prob,-1])
    label_int = tf.cast(label_filter_invalid,dtype=tf.int32)
    num_row = tf.cast(cls_prob.get_shape()[0],dtype=tf.int32)  #[batch]

    # 对应某一batch而言,batch*2为非人类别概率,
    # batch*2+1为人概率类别,indices为对应 cls_prob_reshpae
    # 应该的真实值,后续用交叉熵计算损失
    row = tf.range(num_row)*2   #[0 2 4 6]
    #就是如果label是pos就看1X2中的第2个,neg或part就看第1个
    indices_ = row + label_int
    # 从cls_prob_reshape中获取 索引为indices_的值,squeeze后变成一维的长度为batch_size的张量。
    label_prob = tf.squeeze(tf.gather(cls_prob_reshape, indices_))
    #OHEM向前时,全部的Roi通过网络
    loss = -tf.math.log(label_prob+1e-10)
    zeros = tf.zeros_like(label_prob, dtype=tf.float32)
    ones = tf.ones_like(label_prob, dtype=tf.float32)

    # 把标签为±1的样本对应的索引设为1,其余设为0 #这一步是用来计算较大的候选RIO 用来OHEM
    valid_inds = tf.where(label < zeros,zeros,ones)
    #获取有效的样本数(即标签为±1  (正样本和负样本的数量)
    num_valid = tf.reduce_sum(valid_inds)

    #num_keep_radio = 0.7  选取70%的数据
    keep_num = tf.cast(num_valid*0.7,dtype=tf.int32)
    # print("keep_num",keep_num)
    # 只选取neg,pos的70%损失
    loss = loss * num_valid

    #OHEM就是对loss从高到底排序
    # 反向时,根据排序选择Batch-size/N 个loss值得最大样本来后向传播model的权重
    loss,_ = tf.math.top_k(loss, k=keep_num)

    return tf.math.reduce_mean(loss)


# 人脸框损失函数
def bbox_ohem(bbox_pred,bbox_target,label):

    zeros_index = tf.zeros_like(label,dtype=tf.float32)
    ones_index = tf.ones_like(label,dtype=tf.float32)

    # 等于±1的有效为1,不等于1的无效为0,即筛选出pos和part的索引-OHEM策略
    valid_inds = tf.where(tf.math.equal(tf.math.abs(label),1),ones_index,zeros_index)

    #计算平方差损失
    square_error = tf.math.square(bbox_pred - bbox_target)  #16-1-16-14
    square_error = tf.math.reduce_sum(square_error,axis=1)  #16*16*4


    # 保留数据的个数
    num_valid = tf.math.reduce_sum(valid_inds)
    keep_num = tf.cast(num_valid,dtype=tf.int32)


    #OHEM策略,保留部分pos,part的损失
    square_error = square_error * num_valid

    # 选出最大的进行反向传播
    _,k_index = tf.math.top_k(square_error,k=keep_num)
    # 将部分pos样本和part样本的平方和提取出来
    square_error = tf.gather(square_error, k_index)

    return tf.reduce_mean(square_error)


#人脸五官损失函数
def landmark_ohem(landmark_pred,landmark_target,label):
    #keep label =-2  then do landmark detection
    ones = tf.ones_like(label,dtype=tf.float32)
    zeros = tf.zeros_like(label,dtype=tf.float32)

    # 只保留landmark数据
    valid_inds = tf.where(tf.equal(label,-2),ones,zeros)

    # 计算平方差损失
    square_error = tf.square(landmark_pred-landmark_target)
    square_error = tf.reduce_sum(square_error,axis=1)

    # 保留数据个数
    num_valid = tf.math.reduce_sum(valid_inds) # 0
    keep_num = tf.cast(num_valid, dtype=tf.int32) # 0

    # 保留landmark部分数据损失
    square_error = square_error*valid_inds
    square_error, k_index = tf.nn.top_k(square_error, k=keep_num)
    # square_error = tf.gather(square_error, k_index)

    return tf.math.reduce_mean(square_error) # 当square_error为空时会出现nan bug


#准确率
def cal_accuracy(cls_prob,label):

    # 预测最大概率的类别,0代表无人,1代表有人
    pred = tf.argmax(cls_prob,axis=1)
    label_int = tf.cast(label,tf.int64)

    #返回pos和neg示例的索引 :按元素返回(x> = y)的真值
    cond = tf.where(tf.greater_equal(label_int,0))
    picked = tf.squeeze(cond)
    #true_label选出picked(pos和neg)坐标
    label_picked = tf.gather(label_int,picked)
    #pre_label选出picked(pos和neg)坐标
    pred_picked = tf.gather(pred,picked)

    # accuracy_op = tf.math.reduce_sum(tf.cast(tf.equal(label_picked,pred_picked),dtype=tf.float32))
    # accuracy = tf.math.reduce_mean(tf.cast(tf.math.equal(label_picked, pred_picked), tf.float32))
    return label_picked,pred_picked
    # return accuracy


  1. 我们开始进行训练。

train_pnet.py

import tensorflow as tf
import tensorflow.keras as keras
from tensorflow.keras import metrics
from red_tf import *
from MTCNN_ import Pnet,cls_ohem,bbox_ohem
from tqdm import tqdm
import os









data_path = "12/train_PNet_landmark.tfrecord_shuffle"

# 加载pokemon数据集的工具!
def load_pokemon(mode='train'):
    """ 加载pokemon数据集的工具!
    :param root:    数据集存储的目录
    :param mode:    mode:当前加载的数据是train,val,还是test
    :return:
    """
    # # 创建数字编码表,范围0-4;
    # name2label = {}  # "sq...":0   类别名:类标签;  字典 可以看一下目录,一共有5个文件夹,5个类别:0-4范围;
    # for name in sorted(os.listdir(os.path.join(root))):     # 列出所有目录;
    #     if not os.path.isdir(os.path.join(root, name)):
    #         continue
    #     # 给每个类别编码一个数字
    #     name2label[name] = len(name2label.keys())

    # 读取Label信息;保存索引文件images.csv
    # [file1,file2,], 对应的标签[3,1] 2个一一对应的list对象。
    # 根据目录,把每个照片的路径提取出来,以及每个照片路径所对应的类别都存储起来,存储到CSV文件中。
    size = 12
    images,labels,boxes = red_tf(data_path,size)

    # 图片切割成,训练70%,验证15%,测试15%。
    if mode == 'train':                                                     # 70% 训练集
        images = images[:int(0.7 * len(images))]
        labels = labels[:int(0.7 * len(labels))]
        boxes  = boxes[:int(0.7 * len(boxes))]
    elif mode == 'val':                                                     # 15% = 70%->85%  验证集
        images = images[int(0.7 * len(images)):int(0.85 * len(images))]
        labels = labels[int(0.7 * len(labels)):int(0.85 * len(labels))]
        boxes = boxes[int(0.7 * len(boxes)):int(0.85 * len(boxes))]
    else:                                                                   # 15% = 70%->85%  测试集
        images = images[int(0.85 * len(images)):]
        labels = labels[int(0.85 * len(labels)):]
        boxes = boxes[int(0.85 * len(boxes)):]
    ima = tf.data.Dataset.from_tensor_slices(images)
    lab = tf.data.Dataset.from_tensor_slices(labels)
    roi = tf.data.Dataset.from_tensor_slices(boxes)
    # ima,lab,roi = preprocess(ima,lab,roi)
    train_data = tf.data.Dataset.zip((ima, lab, roi)).shuffle(1000).batch(32)
    train_data = list(train_data.as_numpy_iterator())
    return train_data

import numpy as np
def train(eopch):
    model = Pnet()
    model.load_weights("pnet.h5")

    optimizer = keras.optimizers.Adam(learning_rate=1e-3)
    off = 1000
    acc_meter = metrics.Accuracy()
    for epoch in tqdm(range(eopch)):

        for i,(img,lab,boxes) in enumerate(load_pokemon("train")):


            #img = image_color_distort(img)
            # 开一个gradient tape, 计算梯度
            with tf.GradientTape() as tape:
                cls_prob, bbox_pred = model(img)
                cls_prob = tf.squeeze(cls_prob,[1,2])
                cls_loss = cls_ohem(cls_prob, lab)

                bbox_pred = tf.squeeze(bbox_pred,[1,2])
                bbox_loss = bbox_ohem(bbox_pred, boxes,lab)
                # landmark_loss = landmark_loss_fn(landmark_pred, landmark_batch, label_batch)
                # accuracy = cal_accuracy(cls_prob, label_batch)


                total_loss_value = cls_loss + 0.5 * bbox_loss
                grads = tape.gradient(total_loss_value, model.trainable_variables)
                optimizer.apply_gradients(zip(grads, model.trainable_variables))
            if i % 200 == 0:
                print('Training loss (for one batch) at step %s: %s' % (i, float(total_loss_value)))
                print('Seen so far: %s samples' % ((i + 1) * 6))


        for i, (v_img, v_lab1, boxes) in enumerate(load_pokemon("val")):
            v_img = image_color_distort(v_img)
            with tf.GradientTape() as tape:
                cls_prob, bbox_pred = model(v_img)
                cls_loss = cls_ohem(cls_prob, v_lab1)
                bbox_loss = bbox_ohem(bbox_pred, boxes,v_lab1)
                # landmark_loss = landmark_loss_fn(landmark_pred, landmark_batch, label_batch)
                # accuracy = cal_accuracy(cls_prob, label_batch)


                total_loss_value = cls_loss + 0.5 * bbox_loss
                grads = tape.gradient(total_loss_value, model.trainable_variables)
                optimizer.apply_gradients(zip(grads, model.trainable_variables))
            if i % 200 == 0:
                print('val___ loss (for one batch) at step %s: %s' % (i, float(total_loss_value)))
                print('Seen so far: %s samples' % ((i + 1) * 6))
    model.save_weights('./Weights/pnet_wight/pnet_30.ckpt')

train(30)

训练完成.P_net。

下一篇我们开始制作R_net训练集。

亚洲人脸数据集下载
密码:ctvw

本文地址:https://blog.csdn.net/weixin_41668848/article/details/107333162