Python实现大文件排序的方法
程序员文章站
2022-05-10 21:41:15
...
本文实例讲述了Python实现大文件排序的方法。分享给大家供大家参考。具体实现方法如下:
import gzip import os from multiprocessing import Process, Queue, Pipe, current_process, freeze_support from datetime import datetime def sort_worker(input,output): while True: lines = input.get().splitlines() element_set = {} for line in lines: if line.strip() == 'STOP': return try: element = line.split(' ')[0] if not element_set.get(element): element_set[element] = '' except: pass sorted_element = sorted(element_set) #print sorted_element output.put('\n'.join(sorted_element)) def write_worker(input, pre): os.system('mkdir %s'%pre) i = 0 while True: content = input.get() if content.strip() == 'STOP': return write_sorted_bulk(content, '%s/%s'%(pre, i)) i += 1 def write_sorted_bulk(content, filename): f = file(filename, 'w') f.write(content) f.close() def split_sort_file(filename, num_sort = 3, buf_size = 65536*64*4): t = datetime.now() pre, ext = os.path.splitext(filename) if ext == '.gz': file_file = gzip.open(filename, 'rb') else: file_file = open(filename) bulk_queue = Queue(10) sorted_queue = Queue(10) NUM_SORT = num_sort sort_worker_pool = [] for i in range(NUM_SORT): sort_worker_pool.append( Process(target=sort_worker, args=(bulk_queue, sorted_queue)) ) sort_worker_pool[i].start() NUM_WRITE = 1 write_worker_pool = [] for i in range(NUM_WRITE): write_worker_pool.append( Process(target=write_worker, args=(sorted_queue, pre)) ) write_worker_pool[i].start() buf = file_file.read(buf_size) sorted_count = 0 while len(buf): end_line = buf.rfind('\n') #print buf[:end_line+1] bulk_queue.put(buf[:end_line+1]) sorted_count += 1 if end_line != -1: buf = buf[end_line+1:] + file_file.read(buf_size) else: buf = file_file.read(buf_size) for i in range(NUM_SORT): bulk_queue.put('STOP') for i in range(NUM_SORT): sort_worker_pool[i].join() for i in range(NUM_WRITE): sorted_queue.put('STOP') for i in range(NUM_WRITE): write_worker_pool[i].join() print 'elasped ', datetime.now() - t return sorted_count from heapq import heappush, heappop from datetime import datetime from multiprocessing import Process, Queue, Pipe, current_process, freeze_support import os class file_heap: def __init__(self, dir, idx = 0, count = 1): files = os.listdir(dir) self.heap = [] self.files = {} self.bulks = {} self.pre_element = None for i in range(len(files)): file = files[i] if hash(file) % count != idx: continue input = open(os.path.join(dir, file)) self.files[i] = input self.bulks[i] = '' heappush(self.heap, (self.get_next_element_buffered(i), i)) def get_next_element_buffered(self, i): if len(self.bulks[i]) 1024: self.q.put(self.wbuf) self.wbuf = [] def diff_file(file_old, file_new, file_diff, buf = 268435456): print 'buffer size', buf from file_split import split_sort_file os.system('rm -rf '+ os.path.splitext(file_old)[0] ) os.system('rm -rf '+ os.path.splitext(file_new)[0] ) t = datetime.now() split_sort_file(file_old,5,buf) split_sort_file(file_new,5,buf) print 'split elasped ', datetime.now() - t os.system('cat %s/* | wc -l'%os.path.splitext(file_old)[0]) os.system('cat %s/* | wc -l'%os.path.splitext(file_new)[0]) os.system('rm -f '+file_diff) t = datetime.now() zdiff = open(file_diff, 'a') old_q = Queue(1024) new_q = Queue(1024) old_queue = queue_buffer(old_q) new_queue = queue_buffer(new_q) h1 = Process(target=heappoppush2, args=(os.path.splitext(file_old)[0], old_queue, 3)) h2 = Process(target=heappoppush2, args=(os.path.splitext(file_new)[0], new_queue, 3)) h1.start(), h2.start() old = old_queue.get() new = new_queue.get() old_count, new_count = 0, 0 while old is not None or new is not None: if old > new or old is None: zdiff.write(' '+old+'\n') old = old_queue.get() old_count +=1 else: old = old_queue.get() new = new_queue.get() print 'new_count:', new_count print 'old_count:', old_count print 'diff elasped ', datetime.now() - t h1.join(), h2.join()
希望本文所述对大家的Python程序设计有所帮助。
声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn核实处理。
相关文章
相关视频