2020.7.24 一个130行代码的清洗数据的小工具,多种表分类聚合去重补全-zkjs_wang
程序员文章站
2022-05-30 13:14:26
公司最近要处理的数据越来越大,表格N+++多,看着前辈各种得心应手的我不禁自卑起来,于是为自己写了这个自清洗脚本,130行代码解放双手,终于不用一个表一个表洗数据啦~import numpy as npimport pandas as pdimport ospd.set_option('float_format', lambda x: '%.0f' % x)import warningswarnings.filterwarnings("ignore")#文件类型分类模块def file...
公司最近要处理的数据越来越大,
表格N+++多,看着前辈各种得心应手的我不禁自卑起来,于是为自己写了这个自清洗脚本,130行代码解放双手,终于不用一个表一个表洗数据啦~
import numpy as np
import pandas as pd
import os
pd.set_option('float_format', lambda x: '%.0f' % x)
import warnings
warnings.filterwarnings("ignore")
#文件类型分类模块
def file_groupby_csvName(path_excl):
# 文件分类模块
p_path = pd.read_excel(path_excl)
p_path = p_path[p_path['isTure'] == '是']
fPath_jymx_list = []
fPath_zhxx_list = []
fPath_ryxx_list = []
fPath_qzcs_list = []
fPath_error_list = []
for f_path in p_path.path:
print("正在处理:'%s' 下文件" % (f_path))
for root, dirs, files in os.walk(f_path):
for name in files:
if '交易明细' in name and name[-4:] == '.csv':
fPath_jymx_list.append(os.path.join(root, name))
if '账户信息' in name and name[-4:] == '.csv':
fPath_zhxx_list.append(os.path.join(root, name))
if '人员信息' in name and name[-4:] == '.csv':
fPath_ryxx_list.append(os.path.join(root, name))
if '强制' in name and name[-4:] == '.csv':
fPath_qzcs_list.append(os.path.join(root, name))
elif '交易明细' not in name and '账户信息' not in name and '人员信息' not in name and '强制' not in name and name[-4:] == '.csv':
fPath_error_list.append(os.path.join(root, name))
print("' %s'分类完毕' " % (f_path))
concat_list_a = [fPath_jymx_list, fPath_zhxx_list, fPath_ryxx_list, fPath_qzcs_list, fPath_error_list]
concat_dataframe_csv(concat_list_a)
# 数据表合成模块,按行方向 axis=0
def concat_dataframe_csv(concat_list):
for no in range(len(concat_list)):
if len(concat_list[no]) == 0:
print("跳过路径:'%s'下无文件" % (concat_list[no]))
continue
else:
print("正在处理路径:'%s'下文件" % (concat_list[no][0]))
count = 0
data_top = pd.read_csv(concat_list[no][0], encoding='gb18030')
data_top = insert_t(data_top)
print("---------正在防科学计数法与concat合并---------")
for fPath in concat_list[no]:
count += 1
data = pd.read_csv(fPath, encoding='gb18030')
data = insert_t(data)
data_top = pd.concat([data_top, data], axis=0)
if count % 5 == 0:
print("---------在节点%d已完成%d个文件合并---------" % (no, count))
print("---------防科学计数法已完成---------")
name_list = ['jymx.csv', 'zhxx.csv', 'ryxx.csv', 'qzcs.csv', 'error.csv']
data_top = data_top.drop_duplicates(data_top.columns, keep='first')
data_top.to_csv('输出\\'+name_list[no], index=False)
data_file_name=name_list[no]
print('%s已导出,共完成%d个文件合并' % (data_file_name,count))
print("%s汇总完成" % (name_list[no]))
drop_NaN(data_top,data_file_name)
# 防科学计数法模块 加\t
def insert_t(data_name):
col_clean=[]
data=data_name
for i in data.columns :#列名去空
col_clean.append(i.strip())
data.columns=col_clean
for i in data.columns:
if i=='交易卡号' or i=='开户人证件号码' or i=='交易账号' or i=='交易对手账卡号':#[卡号加\t防科学计数法]
data[i]=data[i].astype("str")
data[i]=np.where(data[i].str.contains("\t"),data[i].str.replace("\t",'\t'),data[i].apply(lambda x:str(x)+"\t"))
data[i]=np.where(data[i].str.contains('_'),data[i].apply(lambda x:x[:x.find("_")]+'\t'),data[i])
return data
data_jymx=1
data_zhxx=2
data_count_open=0
# 去空清洗模块
def drop_NaN(data,to_csv_name):
global data_jymx
global data_zhxx
global data_count_open
for i in data.columns:
if i != '交易卡号' and i != '开户人证件号码' and i != '交易账号' and i != '交易对手账卡号': # [卡号加\t防科学计数法]
print("---------正在处理'%s'列换行/表符---------" % (i))
data[i] = data[i].astype('str')
data[i] = np.where(data[i].str.contains("\t"), data[i].str.replace("\t", ''), data[i])
print('---------正在删除重复值---------')
old_long = len(data.index)
data = data.drop_duplicates(data.columns, keep='first')
new_long = len(data.index)
print('---------共删除了 %d 条重复值,保留%d条合格数据---------' % (old_long - new_long, new_long))
data.to_csv('清洗数据/' + to_csv_name, index=False)
if to_csv_name=='jymx.csv':
data_jymx = data
data_count_open+=1
if to_csv_name=='zhxx.csv':
data_zhxx =data
data_count_open+=1
if data_count_open==2:
bq_data(data_jymx,data_zhxx)
print('*******************************************\n去空去重完成\n*******************************************')
# 信息补全模块
def bq_data(data_jymx,data_zhxx):
print("---------正在准备补全交易明细数据---------")
data = data_zhxx
print("---------读取账户信息成功---------")
data = data[data.交易卡号 != 'nan']
print("---------账户信息去空成功---------")
dict_name = dict(zip(data.交易卡号, data.账户开户名称))
print("---------卡号-开户名称 字典生成 成功---------")
dict_zjhm = dict(zip(data.交易卡号, data.开户人证件号码))
print("---------卡号-开户人证件号 字典生成 成功---------")
data_jymx_clean = data_jymx
print("---------读取jymx_clean.csv数据成功---------")
# data_jymx_clean.交易卡号.apply(lambda x:str(x)+'\t')
data_jymx_clean['交易户名'] = data_jymx_clean['交易卡号']
data_jymx_clean['交易证件号码'] = data_jymx_clean['交易卡号']
print("---------正在补全 开户名称---------")
data_jymx_clean.replace({'交易户名': dict_name}, inplace=True)
print("---------补全 开户名称 成功---------")
print("---------正在补全 交易证件号码---------")
data_jymx_clean.replace({'交易证件号码': dict_zjhm}, inplace=True)
print("---------补全 交易证件号码 成功---------")
print("---------do【 导出清洗数据/jymx_clean_bq.csv】 ing---------")
data_jymx_clean.to_csv("清洗数据/jymx_clean_bq.csv", index=False)
print('---------【完成】---------')
#主函数模块
if __name__ == '__main__':
path_f=("输入\点我输入文件路径.xlsx")
file_groupby_csvName(path_f)
本文地址:https://blog.csdn.net/Captain_DUDU/article/details/107551073