欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Python3 多线程threading处理xlsx/csv数据

程序员文章站 2022-03-18 16:44:34
Python如果单线程执行代码去处理数万个xlsx,包括读出和写入的操作,整个过程耗时会很长。本文以处理一批15000个csv文件为例,对比Python3单线程和多线程处理效率。任务:每一个csv包括三个波段的内容,将其分解成三个波段,总共产生45000个文件。import osfrom pandas import Seriesimport pandas as pdimport numpy as npimport matplotlib.pyplot as pltimport reimport...

Python如果单线程执行代码去处理数万个xlsx,包括读出和写入的操作,整个过程耗时会很长。本文以处理一批15000个csv文件为例,对比Python3单线程和多线程处理效率。

任务:每一个csv包括三个波段的内容,将其分解成三个波段,总共产生45000个文件。

import os
from pandas import Series
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import re
import cmath
import  math
import shutil
from scipy.optimize import curve_fit
from scipy import signal
import time
import threading

1)单线程处理代码,如下:


def hylidar_split(input_raw,split_dir):
    """split 3 channels in one file to one channel in one file"""
    fns_raw = os.listdir(input_raw)
    for i in range(len(fns_raw)):
        df = pd.read_csv(os.path.join(input_raw + '\\', fns_raw[i]))
        ch2_name = fns_raw[i][-20:-16]
        ch3_name = fns_raw[i][-15:-11]
        ch4_name = fns_raw[i][-10:-6]
        #Emittted_bb == Emitted broad band
        df.rename(columns={'channel1': 'Emitted_bb', 'channel2': ch2_name, 'channel3': ch3_name, 'channel4': ch4_name}, inplace=True)#将列重命名一下
        #split_dir + '\\' + fns_raw[i][:-23]  + ch2_name + fns_raw[i][-6:]
        #F:\SCI1数据处理\d8_2process\d8_2split +\\+试验d8_2_X_-10_Y_-10+ch01+_1.csv
        df[['time', 'Emitted_bb', ch2_name]].to_csv(split_dir + '\\' + fns_raw[i][:-24]  + ch2_name + fns_raw[i][-6:],index=False)#选择需要的列生产新csv文件
        df[['time', 'Emitted_bb', ch3_name]].to_csv(split_dir + '\\' + fns_raw[i][:-24]  + ch3_name + fns_raw[i][-6:],index=False)
        df[['time', 'Emitted_bb', ch4_name]].to_csv(split_dir + '\\' + fns_raw[i][:-24]  + ch4_name + fns_raw[i][-6:],index=False)
        del df

if __name == "__main__":
	rawdata_dir = "F:\\leaf data\\WT梧桐\\叶片"
    split_dir = "F:\\leaf data\\WT梧桐\\split_wt"
    beginning_time = time.time()
    localtime = time.asctime( time.localtime(time.time()) )
    
    hylidar_split(rawdata_dir, split_dir)
    
    ending_time = time.time()
    time_cost = (ending_time - beginning_time)/60 
    localtime = time.asctime( time.localtime(time.time()) )
    print("end at :",localtime)
    print("process took",time_cost,"minutes!")

输出:

end at : Thu Nov 19 16:23:55 2020
process took 4.528384971618652 minutes!

2)多线程处理代码,如下:

def hylidar_split(fns_raw,rawdata_dir,split_dir):
    """split 3 channels in one file to one channel in one file"""
    for i in range(len(fns_raw)):
        df = pd.read_csv(rawdata_dir+'\\'+fns_raw[i])
        ch2_name = fns_raw[i][-20:-16]
        ch3_name = fns_raw[i][-15:-11]
        ch4_name = fns_raw[i][-10:-6]
        #Emittted_bb == Emitted broad band
        df.rename(columns={'channel1': 'Emitted_bb', 'channel2': ch2_name, 'channel3': ch3_name, 'channel4': ch4_name}, inplace=True)#将列重命名一下
        #split_dir + '\\' + fns_raw[i][:-23]  + ch2_name + fns_raw[i][-6:]
        #F:\SCI1数据处理\d8_2process\d8_2split +\\+试验d8_2_X_-10_Y_-10+ch01+_1.csv
        df[['time', 'Emitted_bb', ch2_name]].to_csv(split_dir + '\\' + fns_raw[i][:-24]  + ch2_name + fns_raw[i][-6:],index=False)#选择需要的列生产新csv文件
        df[['time', 'Emitted_bb', ch3_name]].to_csv(split_dir + '\\' + fns_raw[i][:-24]  + ch3_name + fns_raw[i][-6:],index=False)
        df[['time', 'Emitted_bb', ch4_name]].to_csv(split_dir + '\\' + fns_raw[i][:-24]  + ch4_name + fns_raw[i][-6:],index=False)
        del df

if __name__ == "__main__":
    
    rawdata_dir = "F:\\leaf data\\WT梧桐\\叶片"
    split_dir = "F:\\leaf data\\WT梧桐\\split_wt"
    
    beginning_time = time.time()
    localtime = time.asctime( time.localtime(time.time()) )
    print("beginning at :",localtime)
    #wl = [409, 425, 442, 458, 474, 491, 507, 523, 540, 556, 572, 589, 605, 621, 637, 653, 670, 686, 703, 719, 735, 751, 768, 784,800, 816, 833, 840, 865, 882, 898, 914]
    n_threads = 16
    thread_list = []
    fns_raw = os.listdir(rawdata_dir)
    step = int(len(fns_raw)/n_threads)
    count = 0
    for item in range(n_threads):
        current_fns = fns_raw[count:count+step]
        thread = threading.Thread(target = hylidar_split,args=(current_fns,rawdata_dir,split_dir,))
        thread_list.append(thread)
        thread.start()
        count+=step
    for item2 in thread_list:
        item2.join()
    ending_time = time.time()
    time_cost = (ending_time - beginning_time)/60 
    localtime = time.asctime( time.localtime(time.time()) )
    print("end at :",localtime)
    print("process took",time_cost,"minutes!")

输出:

end at : Thu Nov 19 22:10:37 2020
process took 3.288083263238271 minutes!

减少1.3mins.
从这个例子来看,并没有减少多长时间。
另外也试过Python3最新的线程池
from concurrent.futures import ThreadPoolExecutor
以及多进程处理
import multiprocessing
时间均为3分多一点。
不知是逻辑不对,理解不到位还是哪里的问题,我计算机16线程全部用上,也才减少一分钟。

本文地址:https://blog.csdn.net/qq_37970770/article/details/109816913