欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

python实现马赛克拼图!

程序员文章站 2022-05-09 15:03:22
python实现马赛克拼图 直接上代码! 代码如下: 注!!!*** 这里不是直接运行的!这里你要在终端使用! **命令:python mosaic_v2.py -i "D:\image\pic.jpg" -t "D:\image" 程序原图: 效果图: ......

python实现马赛克拼图

直接上代码!

代码如下:

#!/usr/local/bin/python3
#  --*-- coding:utf8 --*--

import getopt
import sys
import os

import logging
from pil import image
from multiprocessing import process, queue, cpu_count

tile_size = 30  # 素材图片大小
tile_match_res = 10  #配置指数  ,值越大匹配度越高,执行时间越长
enlargement = 4  # 生成的图片是原始图片的多少倍

tile_block_size = int(tile_size / max(min(tile_match_res, tile_size), 1))
worker_count = max(cpu_count() - 1, 1)
eoq_value = none
warn_info = """ *缺少有效参数*
    参数:
        -i [--image]     : 原图片地址
        -t [--tiles_dir] : 素材目录地址
        -o [--outfile]   : 输出文件地址 【可选】
"""


class tileprocessor:
    def __init__(self, tiles_directory):
        self.tiles_directory = tiles_directory

    def __process_tile(self, tile_path):
        try:
            img = image.open(tile_path)
            # tiles must be square, so get the largest square that fits inside the image
            w = img.size[0]
            h = img.size[1]
            min_dimension = min(w, h)
            w_crop = (w - min_dimension) / 2
            h_crop = (h - min_dimension) / 2
            img = img.crop((w_crop, h_crop, w - w_crop, h - h_crop))
            large_tile_img = img.resize((tile_size, tile_size), image.antialias)
            small_tile_img = img.resize((int(tile_size / tile_block_size), int(tile_size / tile_block_size)),
                                        image.antialias)
            return (large_tile_img.convert('rgb'), small_tile_img.convert('rgb'))
        except exception as e:
            logging.warning(e)
            return (none, none)

    def get_tiles(self):
        large_tiles = []
        small_tiles = []

        logging.info('从 \'%s\' 获取图片素材...' % self.tiles_directory)

        # search the tiles directory recursively
        for root, subfolders, files in os.walk(self.tiles_directory):
            for tile_name in files:
                tile_path = os.path.join(root, tile_name)
                large_tile, small_tile = self.__process_tile(tile_path)
                logging.debug(large_tile)
                logging.debug(small_tile)
                if large_tile:
                    large_tiles.append(large_tile)
                    small_tiles.append(small_tile)

        logging.info('读取素材 %s 完成.' % len(large_tiles))

        return (large_tiles, small_tiles)


class targetimage:
    def __init__(self, image_path):
        self.image_path = image_path

    def get_data(self):
        logging.info('处理主图片...')
        img = image.open(self.image_path)
        w = img.size[0] * enlargement
        h = img.size[1] * enlargement
        large_img = img.resize((w, h), image.antialias)
        w_diff = (w % tile_size) / 2
        h_diff = (h % tile_size) / 2

        # if necesary, crop the image slightly so we use a whole number of tiles horizontally and vertically
        if w_diff or h_diff:
            large_img = large_img.crop((w_diff, h_diff, w - w_diff, h - h_diff))

        small_img = large_img.resize((int(w / tile_block_size), int(h / tile_block_size)), image.antialias)

        image_data = (large_img.convert('rgb'), small_img.convert('rgb'))

        logging.info('主图片处理完成.')

        return image_data


class tilefitter:
    def __init__(self, tiles_data):
        self.tiles_data = tiles_data

    def __get_tile_diff(self, t1, t2, bail_out_value):
        diff = 0
        for i in range(len(t1)):
            # diff += (abs(t1[i][0] - t2[i][0]) + abs(t1[i][1] - t2[i][1]) + abs(t1[i][2] - t2[i][2]))
            diff += ((t1[i][0] - t2[i][0]) ** 2 + (t1[i][1] - t2[i][1]) ** 2 + (t1[i][2] - t2[i][2]) ** 2)
            if diff > bail_out_value:
                # we know already that this isnt going to be the best fit, so no point continuing with this tile
                return diff
        return diff

    def get_best_fit_tile(self, img_data):
        best_fit_tile_index = none
        min_diff = sys.maxsize
        tile_index = 0

        # go through each tile in turn looking for the best match for the part of the image represented by 'img_data'
        for tile_data in self.tiles_data:
            diff = self.__get_tile_diff(img_data, tile_data, min_diff)
            # logging.info(diff)
            if diff < min_diff:
                min_diff = diff
                best_fit_tile_index = tile_index
            tile_index += 1

        return best_fit_tile_index


def fit_tiles(work_queue, result_queue, tiles_data):
    # this function gets run by the worker processes, one on each cpu core
    tile_fitter = tilefitter(tiles_data)

    while true:
        try:
            img_data, img_coords = work_queue.get(true)
            if img_data == eoq_value:
                break
            tile_index = tile_fitter.get_best_fit_tile(img_data)
            result_queue.put((img_coords, tile_index))
        except keyboardinterrupt:
            pass

    # let the result handler know that this worker has finished everything
    result_queue.put((eoq_value, eoq_value))


class progresscounter:
    def __init__(self, total):
        self.total = total
        self.counter = 0

    def update(self):
        self.counter += 1
        sys.stdout.write(
            "进度: %s%% %s" % ((100 * self.counter / self.total), "\r"))

        # sys.stdout.write("progress: %s%% %s" % (100 * self.counter / self.total, "\r"))

    sys.stdout.flush()


class mosaicimage:
    def __init__(self, original_img, outfile):
        self.image = image.new(original_img.mode, original_img.size)
        self.x_tile_count = int(original_img.size[0] / tile_size)
        self.y_tile_count = int(original_img.size[1] / tile_size)
        self.total_tiles = self.x_tile_count * self.y_tile_count
        self.outfile = outfile

    def add_tile(self, tile_data, coords):
        img = image.new('rgb', (tile_size, tile_size))
        img.putdata(tile_data)
        self.image.paste(img, coords)

    def save(self, path):
        self.image.save(path)


def build_mosaic(result_queue, all_tile_data_large, original_img_large, outfile):
    mosaic = mosaicimage(original_img_large, outfile)

    active_workers = worker_count
    while true:
        try:
            img_coords, best_fit_tile_index = result_queue.get()

            if img_coords == eoq_value:
                active_workers -= 1
                if not active_workers:
                    break
            else:
                tile_data = all_tile_data_large[best_fit_tile_index]
                mosaic.add_tile(tile_data, img_coords)

        except keyboardinterrupt:
            pass

    mosaic.save(mosaic.outfile)
    logging.info('============ 生成成功 ============')


def compose(original_img, tiles, outfile):
    logging.info('生成图片中,按下 ctrl-c 中断...')
    original_img_large, original_img_small = original_img
    tiles_large, tiles_small = tiles

    mosaic = mosaicimage(original_img_large, outfile)

    all_tile_data_large = list(map(lambda tile: list(tile.getdata()), tiles_large))
    all_tile_data_small = list(map(lambda tile: list(tile.getdata()), tiles_small))

    work_queue = queue(worker_count)
    result_queue = queue()

    try:
        # start the worker processes that will build the mosaic image
        process(target=build_mosaic, args=(result_queue, all_tile_data_large, original_img_large, outfile)).start()

        # start the worker processes that will perform the tile fitting
        for n in range(worker_count):
            process(target=fit_tiles, args=(work_queue, result_queue, all_tile_data_small)).start()

        progress = progresscounter(mosaic.x_tile_count * mosaic.y_tile_count)
        for x in range(mosaic.x_tile_count):
            for y in range(mosaic.y_tile_count):
                large_box = (x * tile_size, y * tile_size, (x + 1) * tile_size, (y + 1) * tile_size)
                small_box = (
                    x * tile_size / tile_block_size, y * tile_size / tile_block_size,
                    (x + 1) * tile_size / tile_block_size,
                    (y + 1) * tile_size / tile_block_size)
                work_queue.put((list(original_img_small.crop(small_box).getdata()), large_box))
                progress.update()

    except keyboardinterrupt:
        logging.info('\nhalting, saving partial image please wait...')

    finally:
        # put these special values onto the queue to let the workers know they can terminate
        for n in range(worker_count):
            work_queue.put((eoq_value, eoq_value))


def mosaic(img_path, tiles_path, outfile):
    tiles_data = tileprocessor(tiles_path).get_tiles()
    image_data = targetimage(img_path).get_data()
    compose(image_data, tiles_data, output)




if __name__ == '__main__':
    logging.basicconfig(filename='mosaic.log',
                        format='%(asctime)s  %(filename)s : %(levelname)s  %(message)s',
                        level=logging.info)
    logging.getlogger().addhandler(logging.streamhandler())

    opts, args = getopt.gnu_getopt(sys.argv[1:], 'i:t:o:ts:tr:e:', ['image=', 'tiles_dir=', 'outfile=',""])

    base_image = none
    tiles_dir = none
    output = none
    for k, v in opts:
        if k in ("-i", "--image"):
            base_image = v
        if k in ("-t", "--tiles_dir"):
            tiles_dir = v
        if k in ("-o", "--outfile"):
            output = v

    # base_image = none
    # tiles_dir = none

    for value in (base_image, tiles_dir):
        if value is none:
            logging.error(warn_info)
            sys.exit()

    if output is none:
        output = './mosaic.jpg'

    mosaic(base_image, tiles_dir, output)

注!!!***

这里不是直接运行的!这里你要在终端使用!

**命令:python mosaic_v2.py -i "d:\image\pic.jpg" -t "d:\image"

python实现马赛克拼图!

程序原图:

python实现马赛克拼图!

效果图:

python实现马赛克拼图!