又升级啦,玩低配多进程啦~
程序员文章站
2024-03-19 18:44:40
...
觉得自己是老鼠掉进米缸。今天观察发现hdf5文件检查各种慢,各种内存外溢。请自动化谷溪同学指点。
竟然开始了多进程操作...
总之就是很开心。还额外告诉我一个笼统的处理手法,一般爬虫用多线程,关联不大的batch运行用多进程。
def open_hdf5(path_file):
hdf5_addr_list = os.listdir(path_file)
abs_files = []
for x in range(0,len(hdf5_addr_list)):
if os.path.isfile(hdf5_addr_list[x]) and ('.h5' in hdf5_addr_list[x]):
abs_file = os.path.join(path_file,hdf5_addr_list[x])
abs_files.append(abs_file)
return abs_files
symbol = list[file][:-3] #for day_line get_price usage
#
#
# symbol = list[file][:-3] #for day_line get_price usage
# with pd.HDFStore(abs_file,mode='r') as hdf: # 这个会自动关闭打开的文件。
#
def check_hdf5(h5_file):
with pd.HDFStore(h5_file,mode='r') as hdf: # 这个会自动关闭打开的文件。
symbol = str(re.search(r"(\d{6}\.\D{4})",h5_file).group(0)) #get the symbol
for key in hdf.keys(): #check minutes each_day
each_day = hdf.get(key)
date_for_day = str(re.search(r"(\d{4}\d{1,2}\d{1,2})",key).group(0)) #get the date
#print(each_day)
#break
day_line = get_price(symbol, start_date=date_for_day, end_date=date_for_day,adjust_type='none') #day_line
#check each day volume____________________RETIRED, as find out day_line has more issues###
#sum_min_volume = int(each_day['volume'].sum())
#day_volume = int(day_line['volume'])
#if (sum_min_volume > 0) and (abs(sum_min_volume - day_volume) > 1000): #compare with RQ ourselves min vs day
# print("min_sum: %f and day: %f" %(sum_min_volume,day_volume))
# print("check %s on %s " %(symbol,key[1:]))
# continue
#open_or_close_or_成交量为NaN (Logic 1)
if (each_day["open"].isnull().any() == True):
print("check %s on %s. open contains null " %(symbol,key[1:]))
continue;
elif (each_day["close"].isnull().any() == True):
print("check %s on %s. close contains null " %(symbol,key[1:]))
continue;
elif (each_day["volume"].isnull().any() == True):
print("check %s on %s. volume contains null " %(symbol,key[1:]))
continue;
#分钟线一个成交量、成交额加总,一个为零一个不为零。(Logic 2)
sum_min_volume = int(each_day['volume'].sum())
sum_min_turnover = int(each_day['turnover'].sum())
if ((sum_min_volume != 0) and (sum_min_turnover == 0)) or ((sum_min_volume == 0) and (sum_min_turnover != 0)): #compare with RQ ourselves min vs day
print("check %s on %s. sum volume or sum turnover is wrong " %(symbol,key[1:]))
continue;
#分钟线最后一个bar的close 和 日线close 差异大于 5% 或者 绝对差高于0.05。指数免去了绝对差部分。(Logic 3)
#print(each_day.loc[each_day['time'] == '150000'])
day_close = float(day_line['close'])
min_day_close = float(each_day.query('time == 150000')["close"])
if ( symbol[-3:] == 'SHG' and symbol[0:2] == '00'):
if abs(min_day_close - day_close) / day_close > 0.05:
print("check %s on %s. close price spread > 5 percentage " %(symbol,key[1:]))
continue;
elif ( symbol[-3:] == 'SHE' and symbol[0:3] == '399'):
print("check %s on %s. close price spread > 5 percentage " %(symbol,key[1:]))
continue;
elif (symbol[0:1] == '5') or (symbol[0:2] == '15') or (symbol[0:2] == '16'):
print("check %s on %s. close price spread > 5 percentage " %(symbol,key[1:]))
continue;
else:
if abs(min_day_close - day_close) / day_close > 0.05:
print("check %s on %s. close price spread > 5 percentage " %(symbol,key[1:]))
continue;
elif abs(min_day_close - day_close) > 0.05:
print("check %s on %s. close price spread > 0.05 " %(symbol,key[1:]))
continue;
if __name__ == '__main__':
rootdir = './'
hdf5_addr_list=open_hdf5(rootdir)
start = datetime.datetime.now()
with Pool(8) as p:
p.map(check_hdf5, hdf5_addr_list)
end = datetime.datetime.now()
print("Eclapse Time: {}".format(end - start))
推荐阅读