欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

多线程分块处理同一文件,并写入同一文件

程序员文章站 2022-03-10 19:54:32
其实,在测试中,该方法并没有多大程度上节约时间。可能存在以下原因:本地服务器中有大任务在运行,导致等待时间延长大任务的运行占用了所有的cpu,这也使得线程间切换等待时间延长,使并发看起来并不是并发这种线程间切换的延长的时间,增加了处理时间,反而不如单一线程的时间快,因为近乎不存在切换开销大家可以再次优化,节约时间。import os, sysimport timeimport reimport threading# lock - 保护写文件mu = threading.Lock()...

其实,在测试中,该方法并没有多大程度上节约时间。可能存在以下原因:

  • 本地服务器中有大任务在运行,导致等待时间延长
  • 大任务的运行占用了所有的cpu,这也使得线程间切换等待时间延长,使并发看起来并不是并发
  • 这种线程间切换的延长的时间,增加了处理时间,反而不如单一线程的时间快,因为近乎不存在切换开销
  • 大家可以再次优化,节约时间。
import os, sys
import time
import re
import threading

# lock - 保护写文件
mu = threading.Lock()
# 继承线程
class Processer(threading.Thread):
   def __init__(self, file_name, start_pos, end_pos):
      super(Processer, self).__init__()
      self.file_name = file_name
      self.start_pos = start_pos
      self.end_pos = end_pos

   def _extract_info(self, line):
      line = line.strip()
      info = line.split("\t")
      rlts = re.split(re.compile(r'[:-]'), info[25])
      seqnames = 'chr' + rlts[0]
      try:
         start = int(rlts[1]) - 1
         end = rlts[2]
         width = int(end) - int(start) + 1
         strand = info[26]
         cosid = info[17]
         cosmic = seqnames + "\t" + str(start) + "\t" + end + "\t" + str(width) + "\t" + strand + "\t" + cosid
         #print(cosmic)
         return cosmic
      except Exception as e:
         print(line)
         return None

   def _write_line(self, line):
      mu.acquire()
      with open("Cosmic_test.txt", "a+") as fout:
         fout.write(line + "\n")
      mu.release()
    # 重载 Thread 中 run() 方法
    # 由 start() 触发 run() 方法,执行线程操作
   def run(self):
      fp = open(self.file_name, 'r')
    # 判断分块后的文件块的首位置是不是行首,
    # 是行首的话,不做处理,
    # 否则,将文件块的首位置定位到下一行的行首
      if self.start_pos != 0:
         fp.seek(self.start_pos-1)
         if fp.read(1) != '\n':
            line = fp.readline()
            self.start_pos = fp.tell()
      fp.seek(self.start_pos)

      # 对该块进行处理
      while self.start_pos <= self.end_pos:
         line = fp.readline()
         cosrlt = self._extract_info(line)
         if cosrlt != None:
            self._write_line(cosrlt)
         self.start_pos = fp.tell()

# 文件分块
class Partition(object):
   def __init__(self, file_name, thread_num):
      self.file_name = file_name
      self.block_num = thread_num
      # 写入数据
      with open("Cosmic_test.txt", "a+") as fout:
         fout.write("seqnames\tstart\tend\twidth\tstrand\tcosid\n")
   
   def part(self):
      fp = open(self.file_name, 'r')
      fp.seek(0, 2)
      pos_lst = list()
      file_size = fp.tell()
      block_size = file_size // self.block_num
      start_pos = 0
      for i in range(self.block_num):
         if i == self.block_num - 1:
            end_pos = file_size - 1
            pos_lst.append((start_pos, end_pos))
            break
         
         end_pos = start_pos + block_size - 1

         if end_pos >= file_size:
            end_pos = file_size - 1

         if start_pos >= file_size:
            break

         pos_lst.append((start_pos, end_pos))

         start_pos = end_pos + 1
      fp.close()
      return pos_lst

if __name__ == '__main__':
   start_time = time.time()
   thread_num = 20
   pt = Partition(opts.cosmic, thread_num)
   thd = list()
   pos = pt.part()
   for i in range(thread_num):
      thd.append(Processer(opts.cosmic, *pos[i]))

   for i in range(thread_num):
      thd[i].start()

   for i in range(thread_num):
         thd[i].join()

   end_time = time.time()
   print("Cost time is {}".format(end_time - start_time))

本文地址:https://blog.csdn.net/nixiang_888/article/details/107636421