多线程分块处理同一文件,并写入同一文件
程序员文章站
2022-03-10 19:54:32
其实,在测试中,该方法并没有多大程度上节约时间。可能存在以下原因:本地服务器中有大任务在运行,导致等待时间延长大任务的运行占用了所有的cpu,这也使得线程间切换等待时间延长,使并发看起来并不是并发这种线程间切换的延长的时间,增加了处理时间,反而不如单一线程的时间快,因为近乎不存在切换开销大家可以再次优化,节约时间。import os, sysimport timeimport reimport threading# lock - 保护写文件mu = threading.Lock()...
其实,在测试中,该方法并没有多大程度上节约时间。可能存在以下原因:
- 本地服务器中有大任务在运行,导致等待时间延长
- 大任务的运行占用了所有的cpu,这也使得线程间切换等待时间延长,使并发看起来并不是并发
- 这种线程间切换的延长的时间,增加了处理时间,反而不如单一线程的时间快,因为近乎不存在切换开销
- 大家可以再次优化,节约时间。
import os, sys
import time
import re
import threading
# lock - 保护写文件
mu = threading.Lock()
# 继承线程
class Processer(threading.Thread):
def __init__(self, file_name, start_pos, end_pos):
super(Processer, self).__init__()
self.file_name = file_name
self.start_pos = start_pos
self.end_pos = end_pos
def _extract_info(self, line):
line = line.strip()
info = line.split("\t")
rlts = re.split(re.compile(r'[:-]'), info[25])
seqnames = 'chr' + rlts[0]
try:
start = int(rlts[1]) - 1
end = rlts[2]
width = int(end) - int(start) + 1
strand = info[26]
cosid = info[17]
cosmic = seqnames + "\t" + str(start) + "\t" + end + "\t" + str(width) + "\t" + strand + "\t" + cosid
#print(cosmic)
return cosmic
except Exception as e:
print(line)
return None
def _write_line(self, line):
mu.acquire()
with open("Cosmic_test.txt", "a+") as fout:
fout.write(line + "\n")
mu.release()
# 重载 Thread 中 run() 方法
# 由 start() 触发 run() 方法,执行线程操作
def run(self):
fp = open(self.file_name, 'r')
# 判断分块后的文件块的首位置是不是行首,
# 是行首的话,不做处理,
# 否则,将文件块的首位置定位到下一行的行首
if self.start_pos != 0:
fp.seek(self.start_pos-1)
if fp.read(1) != '\n':
line = fp.readline()
self.start_pos = fp.tell()
fp.seek(self.start_pos)
# 对该块进行处理
while self.start_pos <= self.end_pos:
line = fp.readline()
cosrlt = self._extract_info(line)
if cosrlt != None:
self._write_line(cosrlt)
self.start_pos = fp.tell()
# 文件分块
class Partition(object):
def __init__(self, file_name, thread_num):
self.file_name = file_name
self.block_num = thread_num
# 写入数据
with open("Cosmic_test.txt", "a+") as fout:
fout.write("seqnames\tstart\tend\twidth\tstrand\tcosid\n")
def part(self):
fp = open(self.file_name, 'r')
fp.seek(0, 2)
pos_lst = list()
file_size = fp.tell()
block_size = file_size // self.block_num
start_pos = 0
for i in range(self.block_num):
if i == self.block_num - 1:
end_pos = file_size - 1
pos_lst.append((start_pos, end_pos))
break
end_pos = start_pos + block_size - 1
if end_pos >= file_size:
end_pos = file_size - 1
if start_pos >= file_size:
break
pos_lst.append((start_pos, end_pos))
start_pos = end_pos + 1
fp.close()
return pos_lst
if __name__ == '__main__':
start_time = time.time()
thread_num = 20
pt = Partition(opts.cosmic, thread_num)
thd = list()
pos = pt.part()
for i in range(thread_num):
thd.append(Processer(opts.cosmic, *pos[i]))
for i in range(thread_num):
thd[i].start()
for i in range(thread_num):
thd[i].join()
end_time = time.time()
print("Cost time is {}".format(end_time - start_time))
本文地址:https://blog.csdn.net/nixiang_888/article/details/107636421