大文件分片上传
程序员文章站
2024-02-19 08:32:10
...
大文件分片上传
场景,在工作中很多文件都超过了10M。一般nginx 上传文件大小可能在8M的水平。如果这时候上传大文件,nginx 就会报错。
方案
- 修改nginx 配置。
- 配置nginx 断点续传模块。
- 使用http协议实现断点续传。
nginx 修改过大可能对整体的健壮性造成影响。
nginx 配置断点续传后还是需要做额外的处理,所以最终敲定使用http实现分片上传。
设计
- 单机,相关的信息存储在内存里面。
维度
一共考虑三个维度。
- 对文件的管理。
- 单个文件信息。
- 拆分的文件信息。
文件管理类
文件管理类比较简单,只要是通过文件的唯一标志(md5 + 业务),来映射到文件结构。
提供 add, get_key, clear_file 等功能。
class FileUploadManager(object):
def __init__(self):
self.file_map = {}
def add_file_map(self, file_upload_helper):
key = self.get_key(file_upload_helper.business, file_upload_helper.file_check_sum)
self.file_map[key] = file_upload_helper
@staticmethod
def get_key(business, checksum):
return '%s-%s' % (business, checksum)
def get_file_uploader(self, business, checksum):
key = self.get_key(business, checksum)
return self.file_map.get(key)
def is_file_exit(self, business, checksum):
return self.get_key(business, checksum) in self.file_map
def __str__(self):
info = ''
for file_name, file_helper in self.file_map.iteritems():
info += 'file_name:%s %s' % (file_name, file_helper)
return info
def clear_file(self, file_name):
for k, v in self.file_map.iteritems():
if v.file_name == file_name:
v.clear()
del self.file_map[k]
break
文件类
文件类,需要存储文件的名称,md5 的值,文件的分片的大小,分片类的信息包含其中。提供合并函数以及其他校验功能。
class FileUploadHelper(object):
def __init__(self, business, file_name, file_check_sum, total_chunk_number, account):
self.business = business
self.file_name = file_name
self.file_check_sum = file_check_sum
self.total_chunk_number = int(total_chunk_number)
self.chunk_map = dict()
self.checksum_set = set()
self.file_dir = base_path + self.business + '/' + self.file_name + self.file_check_sum
self.upload_file_path = self.file_dir + '/' + file_name
self.operator = account
self.s3_url = ''
self.create_file_dir()
self.upload_s3_status = UploadStatus.INIT
def create_file_dir(self):
if not os.path.exists(self.file_dir):
os.makedirs(self.file_dir)
def add_chunk(self, chunk_check_sum, chunk_content, chunk_number):
chunk_file = FileChunk(chunk_check_sum, chunk_content, chunk_number, self.file_dir)
self.chunk_map[int(chunk_number)] = chunk_file
self.checksum_set.add(chunk_check_sum)
def merge_chunk(self):
# check len
if len(self.chunk_map) != self.total_chunk_number:
return 'chunk_map:%s len not equal total_chunk_number:%s ' % (len(self.chunk_map), self.total_chunk_number)
return self._merge_chunk()
def _merge_chunk(self):
# 文件已存在 并且md5 的值一致 返回, 否则 删除 重新合并
if os.path.exists(self.upload_file_path):
if self.check_is_same() == '':
return
else:
os.remove(self.upload_file_path)
for k, v in sorted(self.chunk_map.iteritems()):
log.data('iteritems|k%s v:%s', k, v.__dict__)
with open(self.upload_file_path, 'a') as file:
with open(v.chunk_file_path, 'r') as chunk_file:
file.write(chunk_file.read())
return self.check_is_same()
def check_is_same(self):
file_checksum = get_big_file_md5(self.upload_file_path)
if file_checksum.lower() != self.file_check_sum.lower():
return 'file_check_sum not equal! current_file_checksum:%s self.file_check_sum:%s' % (file_checksum, self.file_check_sum)
return ''
def get_chunk_checksum(self):
return [checksum for checksum in self.checksum_set]
def set_status(self, status):
self.upload_s3_status = status
def get_status(self):
return self.upload_s3_status
def is_finish(self):
return self.get_status() == UploadStatus.FINISH
def get_s3_url(self):
# s3_url 生成一次 路径生成依赖日期 所以必须存储
return get_temp_s3_url(os.path.join(settings.S3_BUCKET_KEY, self.get_business_s3_path()))
def get_business_s3_path(self):
if self.s3_url == '':
self.s3_url = os.path.join(self.business, get_day_dir(), self.file_name)
return self.s3_url
def clear(self):
del_dir_tree(self.file_dir)
def __str__(self):
chunk_info = '\n#####################chunk_info#######################\n'
for file_number, file_info in self.chunk_map.iteritems():
chunk_info += 'file_number:%s file_info:%s' % (file_number, file_info)
return str(self.__dict__) + chunk_info
分片类
单个分片的信息存储其中.
class FileChunk(object):
def __init__(self, chunk_check_sum, chunk_content, chunk_number, file_dir):
self.chunk_check_sum = chunk_check_sum
# self.file_name = file_name
self.chunk_number = chunk_number
self.file_dir = file_dir
# 文件名称为序号
self.chunk_file_path = os.path.join(self.file_dir, self.chunk_number)
self._save_chunk(chunk_content)
def _save_chunk(self, chunk_content):
if not os.path.exists(self.file_dir):
# 递归创建
os.makedirs(self.file_dir)
# 文件已存在 并且md5 的值一致
if os.path.exists(self.chunk_file_path) and self.check_is_same():
return
with open(self.chunk_file_path, 'w') as f:
f.write(chunk_content)
return
def check_is_same(self):
file_checksum = get_big_file_md5(self.chunk_file_path)
if file_checksum.lower() != self.chunk_check_sum.lower():
log.error('check_is_same|fail file_checksum:%s self.chunk_check_sum:%s',file_checksum, self.chunk_check_sum)
return False
return True
def __str__(self):
return 'chunk_check_sum:%s chunk_number:%s chunk_file_path:%s' % \
(self.chunk_check_sum, self.chunk_number, self.chunk_file_path)
注意点
- 对于以上的信息存储在内存里,这就只能使用单进程单实例的形式来解决,如果多实例不同的请求打到了不同的机器上去分片则无法做一个合并。
- 前端上传分片的时候可能并发上传。分片的顺序是需要注意的。
上一篇: java判断文件类型
下一篇: 【css】固定宽度文字换行