从数据的预处理、模型建立、参数调优、预测效果、数据可视化介绍。可以给出代码和结果截图。 第一步:导入需要的包 from datetime import date from pyspark.sql import SQLContext import time from pyspark.ml.classification import DecisionTreeClassifier from pyspark.ml.evaluation import BinaryClassificationEvaluator from pyspark.ml import Pipeline from pyspark.ml.feature import VectorAssembler import pyspark.sql.functions as fun import numpy as np import pandas as pd from pyspark.sql import SparkSession from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator, ParamGridBuilder from pyspark.sql.functions import udf from pyspark.sql.types import DateType, StringType 第二步:创建需要的spark: spark= SparkSession.builder.appName('train') .getOrCreate() sc = spark.sparkContext 第三步:读取数据 df_off = pd .read_csv('file:/home/joker/PycharmProjects/MLFW/data/cc f_offline_stage1_train.csv') df_test = pd .read_csv('file:/home/joker/PycharmProjects /MLFW/data/ccf_offline_stage1_test_revised.csv ') print('数据读取完成') 第四步:处理数据 经过数据分析后,发现数据存在折扣数据类型过多,有满减还有打折类型,而且消费可能跟领到券的周几有关,所以先对数据进行处理
- 编写数据处理函数:
# 一些用到的函数 def trans_discount(row):
if pd.isnull(row):
return 1.0
elif ':' in str(row):
line = str(row).split(':')
return 1.0 - float(line[1]) / float(line[0])
else:
return float(row)
def trans_df(df):
df['discount_type'] = df['Discount_rate']
.apply(
lambda row: np.nan if pd.isnull(row) else (1 if ':'
in str(row) else 0))
df['discount_rate']=df['Discount_rate']
.apply(trans_discount)
df['discount_man']=df['Discount_rate']
.apply(lambda row:
int(row.split(':')[0]) if ':' in str(row) else 0)
df['discount_reduce']=df['Discount_rate']
.apply(lambda row:
int(row.split(':')[1]) if ':' in str(row) else 0)
df['distance']=df['Distance']
.fillna(-1).astype(int)
return df
def label(row):
if pd.isnull(row['Date_received']):
return -1
if pd.notnull(row['Date']):
td=pd.to_datetime(row['Date'],
format='%Y%m%d')-pd
.to_datetime(
row['Date_received']
,format='%Y%m%d')
if td <= pd.Timedelta(15, 'D'):
return 1
return 0
print('函数编译完成')
- 进行数据处理:
time_start = time.time()
# 数据处理
df_off = trans_df(df_off)
df_test = trans_df(df_test)
# received_date = df_off['Date_received'].unique()
#received_date=sorted(received_date
[pd.notnull(received_date)])
# buying_date = df_off['Date'].unique()
#buying_date=sorted(buying_date[pd.notnull(buying_date)
])
#buying_date=sorted(df_off[df_off['Date']
.notnull()]['Date'])
#coupon_by_date=df_off[
df_off['Date_received'].notnull()]['Date']
# coupon_by_date.columns = ['Date_received', 'count']
#buy_by_date=df_off[(df_off['Date'].notnull())
&(df_off['Date_received'].notnull())]
[['Date_received', 'Date']] \
# .groupby(['Date_received'], as_index=False).count()
# buy_by_date.columns = ['Date_received', 'count']
df_off['weekday'] = df_off['Date_received']
.astype(str).apply(lambda row
: np.nan if row == 'nan' else
date(int(row[0:4]),
int(row[4:6]),
int(row[6:8])).weekday() + 1)
df_test['weekday'] = df_test['Date_received']
.astype(str) \
.apply(lambda row:
np.nan if row == 'nan' else
date(int(row[0:4]),
int(row[4:6]),
int(row[6:8])).weekday() + 1)
df_off['weekday_type'] = df_off['weekday']
.apply(lambda x:
1 if x in [6, 7] else 0)
df_test['weekday_type'] = df_test['weekday']
.apply(lambda x:
1 if x in [6, 7]
else 0)
weekday_columns = ['weekday' + str(i) for i in range(1, 8)]
df_off['label'] = df_off.apply(label, axis=1)
df_test['label'] = df_off.apply(label, axis=1)
time_end = time.time()
print('time cost', time_end - time_start, 's')
print('数据处理结束')
5.数据分割: df_off.drop(['Distance', 'Discount_rate'], axis=1, inplace=True)
df_test.drop(['Distance', 'Discount_rate'], axis=1, inplace=True)
df = df_off[df_off['label'] != -1].copy()
# df_t = df_test[df_test['label'] != -1].copy()
train = df[(df['Date_received'] < 20160530)].copy()
valid = df[(df['Date_received'] >= 20160516) & (df['Date_received'] <= 20160615)].copy()
print('数据分割结束')
train['weekday'] = df['weekday'].astype(int)
train['Coupon_id'] = df['Coupon_id'].astype(int)
original_feature = ['discount_rate', 'discount_type', 'discount_man', 'discount_reduce',
'distance', 'weekday', 'weekday_type']
print('数据分割结束') 第五步:建立模型、 sql_context = SQLContext(sc)
df_train = sql_context.createDataFrame(train)
sp_test = sql_context.createDataFrame(valid)
# %%
assemblerInputs = original_feature
assembler = VectorAssembler(inputCols=assemblerInputs
, outputCol='features')
paramGrid = ParamGridBuilder()
.addGrid(dt.impurity, ['gini', 'entropy'])\
.addGrid(dt.maxDepth, [16, 25])\
.addGrid(dt.maxBins, [5, 10]).build()
tvs=TrainValidationSplit(estimator=dt
, evaluator=evaluator
,estimatorParamMaps=paramGrid
,trainRatio=0.8, seed=19)
tvs_pipeline = Pipeline(stages=[assembler, tvs])
tvs_model = tvs_pipeline.fit(df_train)
proficiency=evaluator
.evaluate(tvs_model.transform(sp_test))
最初准确率为50%,经过参数调优后,准确率达到了65%, 但是由于机器性能有限,数据量过于庞大、跑一次程序就 需要十五分钟,所以没有进行深度的优化 下面这个图便是参数调优时候对树深度的可视化 代码 # %%
from datetime import date
from pyspark.sql import SQLContext
import time
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
import pyspark.sql.functions as fun
import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit, CrossValidator, ParamGridBuilder
from pyspark.sql.functions import udf
from pyspark.sql.types import DateType, StringType
spark = SparkSession.builder.appName('train').getOrCreate()
sc = spark.sparkContext
# 读取数据
df_off = pd.read_csv('file:/home/joker/PycharmProjects/MLFW/data/ccf_offline_stage1_train.csv')
# df_on = pd.read_csv('file:/home/joker/PycharmProjects/MLFW/data/ccf_online_stage1_train.csv')
df_test = pd.read_csv('file:/home/joker/PycharmProjects/MLFW/data/ccf_offline_stage1_test_revised.csv')
print('数据读取完成')
# %%
# 一些用到的函数
def trans_discount(row):
if pd.isnull(row):
return 1.0
elif ':' in str(row):
line = str(row).split(':')
return 1.0 - float(line[1]) / float(line[0])
else:
return float(row)
def trans_df(df):
df['discount_type'] = df['Discount_rate'].apply(
lambda row: np.nan if pd.isnull(row) else (1 if ':' in str(row) else 0))
df['discount_rate'] = df['Discount_rate'].apply(trans_discount)
df['discount_man'] = df['Discount_rate'].apply(lambda row: int(row.split(':')[0]) if ':' in str(row) else 0)
df['discount_reduce'] = df['Discount_rate'].apply(lambda row: int(row.split(':')[1]) if ':' in str(row) else 0)
df['distance'] = df['Distance'].fillna(-1).astype(int)
return df
def label(row):
if pd.isnull(row['Date_received']):
return -1
if pd.notnull(row['Date']):
td = pd.to_datetime(row['Date'], format='%Y%m%d') - pd.to_datetime(row['Date_received'], format='%Y%m%d')
if td <= pd.Timedelta(15, 'D'):
return 1
return 0
print('函数编译完成')
time_start = time.time()
# 数据处理
df_off = trans_df(df_off)
df_test = trans_df(df_test)
# received_date = df_off['Date_received'].unique()
# received_date = sorted(received_date[pd.notnull(received_date)])
# buying_date = df_off['Date'].unique()
# buying_date = sorted(buying_date[pd.notnull(buying_date)])
# buying_date = sorted(df_off[df_off['Date'].notnull()]['Date'])
# coupon_by_date = df_off[df_off['Date_received'].notnull()]['Date']
# coupon_by_date.columns = ['Date_received', 'count']
# buy_by_date = df_off[(df_off['Date'].notnull()) & (df_off['Date_received'].notnull())][['Date_received', 'Date']] \
# .groupby(['Date_received'], as_index=False).count()
# buy_by_date.columns = ['Date_received', 'count']
df_off['weekday'] = df_off['Date_received'].astype(str) \
.apply(lambda row: np.nan if row == 'nan' else date(int(row[0:4]), int(row[4:6]), int(row[6:8])).weekday() + 1)
df_test['weekday'] = df_test['Date_received'].astype(str) \
.apply(lambda row: np.nan if row == 'nan' else date(int(row[0:4]), int(row[4:6]), int(row[6:8])).weekday() + 1)
df_off['weekday_type'] = df_off['weekday'].apply(lambda x: 1 if x in [6, 7] else 0)
df_test['weekday_type'] = df_test['weekday'].apply(lambda x: 1 if x in [6, 7] else 0)
weekday_columns = ['weekday' + str(i) for i in range(1, 8)]
# df_temp = pd.get_dummies(df_off['weekday'].replace('nan', np.nan))
# df_temp.columns = weekday_columns
df_off['label'] = df_off.apply(label, axis=1)
df_test['label'] = df_off.apply(label, axis=1)
time_end = time.time()
print('time cost', time_end - time_start, 's')
print('数据处理结束')
# %%
# 数据分割
df_off.drop(['Distance', 'Discount_rate'], axis=1, inplace=True)
df_test.drop(['Distance', 'Discount_rate'], axis=1, inplace=True)
df = df_off[df_off['label'] != -1].copy()
# df_t = df_test[df_test['label'] != -1].copy()
train = df[(df['Date_received'] < 20160530)].copy()
valid = df[(df['Date_received'] >= 20160516) & (df['Date_received'] <= 20160615)].copy()
print('数据分割结束')
train['weekday'] = df['weekday'].astype(int)
train['Coupon_id'] = df['Coupon_id'].astype(int)
original_feature = ['discount_rate', 'discount_type', 'discount_man', 'discount_reduce',
'distance', 'weekday', 'weekday_type']
print('数据分割结束')
# %%
time_start = time.time()
sql_context = SQLContext(sc)
df_train = sql_context.createDataFrame(train)
sp_test = sql_context.createDataFrame(valid)
time_end = time.time()
print('time cost', time_end - time_start, 's')
# %%
df_train.select('Date', 'label').show()
# %%
assemblerInputs = original_feature
time_start = time.time()
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol='features')
time_end = time.time()
print('time cost', time_end - time_start, 's')
df_assembler = assembler.transform(df_train)
# %%
dt = DecisionTreeClassifier(labelCol='label', featuresCol='features', impurity='gini', maxDepth=10, maxBins=14)
# dt_model = dt.fit(df_assembler)
#
# dt_DT = dt_model.transform(df_assembler)
#%%
pipeline = Pipeline(stages=[assembler, dt])
pipelineModel = pipeline.fit(df_train)
predicted = pipelineModel.transform(sp_test)
#%%
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='label', metricName='areaUnderROC')
auc = evaluator.evaluate(predicted)
print(auc)
#%%
paramGrid = ParamGridBuilder().addGrid(dt.impurity, ['gini', 'entropy'])\
.addGrid(dt.maxDepth, [16, 25])\
.addGrid(dt.maxBins, [5, 10]).build()
max_pro = 0.651
max_seed = 19 # 19
#0.651
for i in range(100, 200):
tvs = TrainValidationSplit(estimator=dt, evaluator=evaluator, estimatorParamMaps=paramGrid, trainRatio=0.8, seed=i)
tvs_pipeline = Pipeline(stages=[assembler, tvs])
tvs_model = tvs_pipeline.fit(df_train)
proficiency = evaluator.evaluate(tvs_model.transform(sp_test))
print(tvs_model.stages[1].bestModel.toDebugString[:500])
if max_pro < proficiency:
print(proficiency)
max_pro = proficiency
max_seed = i
print(max_seed, ' ', max_pro)
#%%
# from pyspark.ml.classification import RandomForestClassifier
#
# time_start = time.time()
# rf = RandomForestClassifier(labelCol='label', featuresCol='features', numTrees=100)
# rf_pipeline = Pipeline(stages=[assembler, rf])
# rf_model = rf_pipeline.fit(df_train)
# print(evaluator.evaluate(rf_model.transform(sp_test)))
# time_end = time.time()
# print('time cost', time_end - time_start, 's')
# %%
# from pyspark.ml.tuning import ParamGridBuilder
# paramGrid = ParamGridBuilder().addGrid(rf.impurity, ['gini', 'entropy'])\
# .addGrid(rf.maxDepth, [5, 10, 15])\
# .addGrid(rf.maxBins, [3, 6, 9])\
# .addGrid(rf.numTrees, [3, 5, 10]).build()
#
# rfcv = CrossValidator(estimator=rf, evaluator=evaluator, estimatorParamMaps=paramGrid, numFolds=5)
# rfcv_pipeline = Pipeline(stages=[assembler, rfcv])
# rfcv_model = rfcv_pipeline.fit(df_train)
# predicted = rfcv_model.transform(sp_test)
# print(evaluator.evaluate(predicted))
#
#
# %%
# def evaluationTrainModel(train_data, validation_data, impurity, maxDepth, maxBins):
# 记录开始时间
# start_time = time.time()
#
#
# param = ParamGridBuilder().addGrid(rf.impurity, impurity) \
# .addGrid(rf.maxDepth, maxDepth) \
# .addGrid(rf.maxBins, maxBins).build()
# tvs = TrainValidationSplit(estimator=dt, evaluator=evaluator, estimatorParamMaps=param, trainRatio=0.8)
# tvs_pipeline = Pipeline(stages=[assembler, tvs])
# tvs_model = tvs_pipeline.fit(train_data)
# predicted = tvs_model.transform(validation_data)
# auc = evaluator.evaluate(predicted)
# duration = time.time() - start_time
# auc_t = evaluator.evaluate(predicted)
# return (tvs_model, duration, auc_t, impurity, maxDepth, maxBins)
#
# 根据传入参数训练模型
#
|