欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

又升级啦,玩低配多进程啦~

程序员文章站 2024-03-19 18:44:40
...

觉得自己是老鼠掉进米缸。今天观察发现hdf5文件检查各种慢,各种内存外溢。请自动化谷溪同学指点。

竟然开始了多进程操作...

总之就是很开心。还额外告诉我一个笼统的处理手法,一般爬虫用多线程,关联不大的batch运行用多进程。


def open_hdf5(path_file):
	
	hdf5_addr_list = os.listdir(path_file)
	abs_files = []
	
	for x in range(0,len(hdf5_addr_list)):
		if os.path.isfile(hdf5_addr_list[x]) and ('.h5' in hdf5_addr_list[x]):
			abs_file = os.path.join(path_file,hdf5_addr_list[x])
			abs_files.append(abs_file)
			
	return abs_files
	
	symbol = list[file][:-3]  #for day_line get_price usage
	
	#  
	#
	#	symbol = list[file][:-3]  #for day_line get_price usage
	#	with pd.HDFStore(abs_file,mode='r') as hdf:                  # 这个会自动关闭打开的文件。
	#	

	
def check_hdf5(h5_file):
	with pd.HDFStore(h5_file,mode='r') as hdf:                  # 这个会自动关闭打开的文件。
		
		symbol = str(re.search(r"(\d{6}\.\D{4})",h5_file).group(0)) #get the symbol
		
		
		for key in hdf.keys():	#check minutes each_day
				
				each_day = hdf.get(key)
				date_for_day = str(re.search(r"(\d{4}\d{1,2}\d{1,2})",key).group(0)) #get the date
				#print(each_day)
				#break
				
				day_line = get_price(symbol, start_date=date_for_day, end_date=date_for_day,adjust_type='none')  #day_line
	
				
				#check each day volume____________________RETIRED, as find out day_line has more issues###
				#sum_min_volume = int(each_day['volume'].sum())
				#day_volume = int(day_line['volume'])
				#if (sum_min_volume > 0) and (abs(sum_min_volume - day_volume) > 1000):  #compare with RQ ourselves min vs day
				#	print("min_sum: %f and day: %f" %(sum_min_volume,day_volume))
				#	print("check %s on  %s " %(symbol,key[1:]))
				#	continue
				
				
				#open_or_close_or_成交量为NaN   (Logic 1)
				if (each_day["open"].isnull().any() == True):
					print("check %s on  %s. open contains null " %(symbol,key[1:]))
					continue;
				elif (each_day["close"].isnull().any() == True):
					print("check %s on  %s. close contains null " %(symbol,key[1:]))
					continue;
				elif (each_day["volume"].isnull().any() == True):
					print("check %s on  %s. volume contains null " %(symbol,key[1:]))
					continue;
					
					
					
				#分钟线一个成交量、成交额加总,一个为零一个不为零。(Logic 2)
				sum_min_volume = int(each_day['volume'].sum())
				sum_min_turnover = int(each_day['turnover'].sum())
	
				if ((sum_min_volume != 0) and (sum_min_turnover == 0)) or ((sum_min_volume == 0) and (sum_min_turnover != 0)):  #compare with RQ ourselves min vs day
					print("check %s on  %s. sum volume or sum turnover is wrong " %(symbol,key[1:]))
					continue;				
					
				#分钟线最后一个bar的close 和 日线close 差异大于 5% 或者 绝对差高于0.05。指数免去了绝对差部分。(Logic 3)
				#print(each_day.loc[each_day['time'] == '150000'])
				day_close = float(day_line['close'])
				min_day_close = float(each_day.query('time == 150000')["close"])
				
				if ( symbol[-3:] == 'SHG' and symbol[0:2] == '00'):
					if abs(min_day_close - day_close) / day_close > 0.05:
						print("check %s on  %s. close price spread > 5 percentage " %(symbol,key[1:]))
						continue;			
				elif ( symbol[-3:] == 'SHE' and symbol[0:3] == '399'):
						print("check %s on  %s. close price spread > 5 percentage " %(symbol,key[1:]))
						continue;	
				elif (symbol[0:1] == '5') or (symbol[0:2] == '15') or (symbol[0:2] == '16'):
						print("check %s on  %s. close price spread > 5 percentage " %(symbol,key[1:]))
						continue;			
				else:
					if abs(min_day_close - day_close) / day_close > 0.05:
						print("check %s on  %s. close price spread > 5 percentage " %(symbol,key[1:]))
						continue;				
					elif abs(min_day_close - day_close) > 0.05:
						print("check %s on  %s. close price spread > 0.05 " %(symbol,key[1:]))
						continue;			
	
	
if __name__ == '__main__':
	
	rootdir = './'
	hdf5_addr_list=open_hdf5(rootdir)
	start = datetime.datetime.now()
	with Pool(8) as p:                                         
		p.map(check_hdf5, hdf5_addr_list)
	end = datetime.datetime.now()
	
	print("Eclapse Time: {}".format(end - start))


又升级啦,玩低配多进程啦~


相关标签: 多进程