欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Django开发☞文件上传

程序员文章站 2022-03-24 23:11:40
...

 

    Python+Django文件开发工具类及文件操作工具类

 

# coding:utf-8
import base64
import os

import time
import datetime
import shutil
import zipfile

from django.conf import settings
from django.http import HttpResponse

from common.utils.utils_log import log

# ===========================================================================================================
# 文件上传工具类 基础环境:python 3.6
# ===========================================================================================================


class FileUploadUtil(object):
    """
    文件上传下载工具类
    """
    file_obj = None             # 待上传文件对象
    file_name = None            # 传入的文件名【不带扩展名】
    file_extension = None       # 传入的文件类型【文件扩展名】

    new_file_name = None        # 新的文件名,存入时文件名【新文件名,带扩展名】
    real_path = None            # 文件实际全路径【目录】
    file_path = None            # 文件存放路径【相对路径】
    absolute_path = None        # 文件绝对路径【C:/tmp/test.zip】
    max_size = None             # 文件上传大小限制

    def __init__(self, src_file_obj, dst_file_path=settings.UPLOAD_DEFAULT_FOLDER, max_size=settings.UPLOAD_DEFAULT_SIZE):
        """
        初始化文件上传对象
        :param src_file_obj: 待上传文件流
        :param dst_file_path: 待上传文件存放路径(默认上传到media/tmp目录)
        :param max_size: 默认文件上传大小
        """
        self.file_obj = src_file_obj
        self.set_file_info(src_file_obj.name)
        self.set_new_name()
        self.set_real_path(dst_file_path)
        self.max_size = max_size
        self.file_path = "%s%s" % (dst_file_path, self.new_file_name)
        self.absolute_path = "%s%s" % (self.real_path, self.new_file_name)

    def set_file_info(self, src_file_name):
        """
        从传入的文件名中拆分文件名称和文件格式
        :param src_file_name: 待操作的文件全路径名称
        :return:
        """
        file_tmp_name, file_extension = FileOperateUtil.extract_file_name(src_file_name)
        self.file_name = file_tmp_name
        self.file_extension = file_extension

    def set_new_name(self):
        """
        根据文件类型,加上当前时间的年月日时分秒生成新的文件名
        :return:
        """
        current_timestamp = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
        self.new_file_name = "%s%s" % (current_timestamp, self.file_extension)

    def set_real_path(self, dst_file_path):
        """
        设置文件实际文件路径(拼接上:settings.MEDIA_ROOT)
        :param dst_file_path: 文件存放文件夹路径
        :return: 文件绝对路径
        """
        self.real_path = "%s/%s" % (settings.MEDIA_ROOT, dst_file_path)

    def upload_file(self, is_new_folder=False):
        """
        上传文件:
            上传的文件数据,可直接从FileUploadUtil对象中获取
        :param is_new_folder: 若目录不存在时,是否自动新建该目录
        :return: 返回boolean 成功返回true,失败返回false
        """
        try:
            # 第一步判断文件目录是否存在,不存在会抛出异常
            FileOperateUtil.validate_folder_exists(self.real_path, is_new_folder)
            # 判断文件大小
            FileOperateUtil.validate_file_size(self.file_obj, self.max_size)
            # 上传文件到指定目录
            FileOperateUtil.copy_file_chunks(self.file_obj, self.real_path+self.new_file_name)
        except Exception as e:
            log.debug("文件上传发生异常,%s" % e)
            return False
        return True

    @staticmethod
    def download_file(src_file_path, down_file_name=None):
        """
        下载文件,返回带有文件流的response响应
        :param src_file_path: 待下载文件路径
        :param down_file_name: 下载时指定文件名
        :return: 带有文件流的response响应
        """
        real_path = "%s/%s" % (settings.MEDIA_ROOT, src_file_path)
        # 判断该文件目录是否存在
        if os.path.exists(real_path):
            # 判断文件是否是文件
            if os.path.isfile(real_path):
                # 获取文件本身自己的名称
                filename = os.path.basename(real_path)
                # 若未指定下载文件名,则已文件本身名称为准
                if down_file_name is None:
                    down_file_name = filename
                # 读取文件流,并构造请求响应
                response = HttpResponse(FileOperateUtil.file_iterator(real_path))
                # 设置响应流
                response['Content_type'] = "application/octet-stream"
                response['Content-Disposition'] = 'attachment; filename={0}'.format(down_file_name.encode('utf-8').decode('utf-8'))
            else:
                response = HttpResponse("下载失败,不是一个文件!", content_type="text/plain;charset=utf-8")
        else:
            response = HttpResponse("文件不存在,下载失败!", content_type="text/plain;charset=utf-8")
        return response

    @staticmethod
    def upload_base64_file(base64_obj, dst_file_path=settings.UPLOAD_DEFAULT_FOLDER, file_ext=".jpg"):
        """
        上传base64文件流到服务器指定目录
        :param dst_file_path: 文件存放路径 "tmp/"
        :param base64_obj: base64流对象
        :param file_ext:   新文件扩展名:默认.jpg
        :return: 文件上传存入的文件目录
        """
        ret = {'status': False, 'msg': ''}
        try:
            # 将base64转成文件流对象
            file_str = base64_obj.split(',')[1].encode('utf-8')
            file_data = base64.b64encode(file_str)
            # 当前时间戳:年月日时分秒
            cur_time = int(round(time.time() * 1000))
            # 拼接新的文件名
            file_name = "%s%s" % (cur_time, file_ext)
            # 拼接文件存放目录
            file_path = "%s/%s" % (settings.MEDIA_ROOT, dst_file_path)
            save_path = "%s%s%s" % (dst_file_path, file_name, file_ext)
            # 判断文件目录
            FileOperateUtil.validate_folder_exists(file_path, is_create=True)
            try:
                # 拼接文件最终存放全路径
                real_path = "%s%s" % (file_path, file_name)
                # 上传文件操作
                destination = open(real_path, 'wb+')
                # 写入文件
                destination.write(file_data)
                ret = {'status': 200, 'msg': '上传成功', 'file_path': save_path}
            except Exception as e:
                log.debug("上传base64文件,写文件发生异常,%s" % e)
                raise Exception("上传base64文件,写文件发生异常")
            finally:
                destination.close()
        except Exception as e:
            log.debug("上传base64文件发生异常,%s" % e)
            ret = {'status': 500, 'msg': e}
        return ret

    @staticmethod
    def download_batch_file(src_file_folder, zip_folder=settings.ZIP_DEFAULT_FOLDER, down_file_name=None, is_delete=True):
        """
        将文件夹压缩打包下载
        :param src_file_folder: 要下载的文件目录
        :param zip_folder: 打包存放目录
        :param down_file_name: 下载时指定文件名
        :param is_delete: 下载完成后是否删除打包的文件,默认:删除
        :return: 带有文件流的response响应
        """
        # 判断待打包目录是否存在,不存在则抛出异常
        FileOperateUtil.validate_folder_exists(src_file_folder)
        # 判断打包存放目录是佛存在,不存在则新建
        FileOperateUtil.validate_folder_exists(zip_folder, is_create=True)
        # 设置响应头
        if os.path.exists(src_file_folder):
            cur_time = int(round(time.time() * 1000))
            zip_file_name = "%s" % cur_time
            zip_file_path = "%s/%s" % (settings.MEDIA_ROOT, zip_folder, zip_file_name)
            # 打包该目录
            FileOperateUtil.zip_dir(src_file_folder, zip_file_path)
            # 判断打包生成的文件是否是个文件
            if os.path.isfile(zip_file_path):
                filename = os.path.basename(zip_file_path)
                # 若未指定下载文件名,则已文件本身名称为准
                if down_file_name is None:
                    down_file_name = filename
                # 读取文件流,并构造请求响应
                response = HttpResponse(FileOperateUtil.file_iterator(zip_file_path))
                response['Content_type'] = "application/octet-stream"
                response['Content-Disposition'] = 'attachment; filename={0}'.format(down_file_name.encode('utf-8'))
            else:
                response = HttpResponse("下载失败,不是一个文件!", content_type="text/plain;charset=utf-8")
        else:
            response = HttpResponse("文件不存在,下载失败!", content_type="text/plain;charset=utf-8")
        return response


# ===========================================================================================================
# 文件操作工具类
# ===========================================================================================================


class FileOperateUtil(object):
    """
    文件操作类
    """

    @staticmethod
    def file_iterator(down_file):
        """根据文件路径获取待下载文件流"""
        content = open(down_file, 'rb+').read()
        return content

    @staticmethod
    def extract_file_name(src_file_name):
        """
        从完整路径名称中提取文件扩展名
        :param src_file_name:
        :return: 文件扩展名元组(文件路径,文件扩展名) eg:C:/tmp/test.ext --> ('C:/tmp/', '.txt')
        """
        ext_meta = os.path.splitext(src_file_name)
        return ext_meta

    @staticmethod
    def validate_folder_exists(folder, is_create=False):
        """
        判断文件目录是否存在,如果不存在则创建
        :param folder: 待判断目录
        :param is_create: 是否创建
        :return:
        """
        if not os.path.exists(folder):
            if is_create:
                os.makedirs(folder)
            else:
                raise Exception("该目录不存在,请先创建该目录")
        return True

    @staticmethod
    def validate_file_size(src_file_path, limit_size):
        """
        判断文件大小,不能小于指定文件大小,更不能大于最大文件限制
        :param src_file_path:源文件
        :param limit_size: 限制大小
        :return:
        """
        if isinstance(src_file_path, object):  # 文件对象
            file_size = src_file_path.size
        else:
            file_size = os.path.getsize(src_file_path)
        # 判断文件大小,是否小于最大文件限制
        if file_size >= settings.UPLOAD_MAX_SIZE:
            raise Exception("上传失败,文件过大,超过10M")
        else:
            if file_size >= limit_size:
                raise Exception("上传失败,文件过大")
        return True

    @staticmethod
    def copy_file_chunks(src_file_obj, dst_file_path):
        """
        文件复制到指定目录【因为chunks貌似只有从request拿到的文件流才有,所以此方法只用于文件上传】
        :param src_file_obj: 待复制文件流对象
        :param dst_file_path: 存放目标文件路径,带文件名的完整路径 如:c:/tmp/test.txt
        :return:
        """
        # 创建存放文件流对象
        destination = open(dst_file_path, 'wb+')
        try:
            # 循环源文件块对象
            for chunk in src_file_obj.chunks():
                # 将文件流写入
                destination.write(chunk)
        except Exception as e:
            log.debug("复制文件失败,%s" % e)
            os.remove(dst_file_path)
            raise Exception("复制文件失败,%s" % e)
        finally:
            log.debug("关闭文件destination")
            destination.close()
        return True

    @staticmethod
    def copy_file(src_file_path, dst_file_path):
        """
        文件复制到指定目录【任意文件】
        :param src_file_path: 待复制文件完整路径
        :param dst_file_path: 存放目标文件路径,带文件名的完整路径 如:c:/tmp/test.txt
        :return:
        """
        # 判断源文件是否是文件
        if not os.path.isfile(src_file_path):
            raise Exception("上传失败,不是一个文件!")
        # 创建存放文件流对象
        destination = open(dst_file_path, 'wb+')
        try:
            src_file = open(src_file_path, "rb+")
            # 循环源文件块对象
            for line in src_file:
                # 将文件流写入
                destination.write(line)
        except Exception as e:
            log.debug("复制文件失败,%s" % e)
            os.remove(dst_file_path)
            raise Exception("复制文件失败,%s" % e)
        finally:
            destination.close()
        return True

    @staticmethod
    def zip_dir(tar_dir_name, zip_file_name):
        """
         函数目的: 压缩指定目录为zip文件
         使用DEMO: FileOperateUtil.zip_dir("C:/tmp/", "E:/test.zip")
        :param tar_dir_name: 待压缩的目录
        :param zip_file_name:  压缩后的zip文件路径 eg:C:/tmp/test.zip
        :return: boolean ,True-成功,False-失败
        """
        file_list = []
        ret = False
        try:
            # 判断是否为文件
            if os.path.isfile(tar_dir_name):
                file_list.append(tar_dir_name)
            else:
                # 循环目录,读取文件列表
                for root, dirs, files in os.walk(tar_dir_name):
                    for name in files:
                        file_list.append(os.path.join(root, name))
            # 创建ZIP文件操作对象
            zf = zipfile.ZipFile(zip_file_name, "w", zipfile.zlib.DEFLATED)
            try:
                for tar in file_list:
                    arc_name = tar[len(tar_dir_name):]
                    zf.write(tar, arc_name)
                ret = True
            except Exception as e:
                log.debug("压缩文件发送异常,%s" % e)
                raise Exception("压缩文件发送异常")
            finally:
                zf.close()
        except Exception as e:
            log.debug("---- 压缩文件发生异常,%s --" % e)
            ret = False
        return ret

    @staticmethod
    def unzip_file(zip_file_name, unzip_dir_path):
        """
        解压zip文件到指定目录
        使用DEMO:FileOperateUtil.unzip_file("C:/tmp/test.zip", "c:/tmp/zip/")
        :param zip_file_name: 为zip文件路径,
        :param unzip_dir_path:  为解压文件后的文件目录
        :return : boolean
        """
        # 判断目标文件目录是否存在,不存在就创建
        if not os.path.exists(unzip_dir_path):
            os.mkdir(unzip_dir_path)
        # 创建zip操作对象
        zf_obj = zipfile.ZipFile(zip_file_name)
        try:
            for name in zf_obj.namelist():
                name = name.replace('\\', '/')
                if name.endswith('/'):
                    p = os.path.join(unzip_dir_path, name[:-1])
                    if os.path.exists(p):
                        # 如果文件夹存在,就删除之:避免有新更新无法复制[递归删除]
                        shutil.rmtree(p)
                    os.mkdir(p)
                else:
                    ext_filename = os.path.join(unzip_dir_path, name)
                    ext_dir = os.path.dirname(ext_filename)
                    if not os.path.exists(ext_dir):
                        os.mkdir(ext_dir)
                    outfile = open(ext_filename, 'wb')
                    try:
                        outfile.write(zf_obj.read(name))
                    except Exception as e:
                        log.debug("写文件发生异常 %s" % e)
                        raise Exception("解压文件发生异常")
                    finally:
                        outfile.close()
        except Exception as e:
            log.debug("解压文件发生异常 %s" % e)
            raise Exception(e)
            shutil.rmtree(unzip_dir_path)
        return True

    诸多不完善之处,敬请指正!