Spark机器学习不想跟你说话并向你扔了一个kaggle小例子
在前文中分别就Spark机器学习中的各个模块进行逐个描述,本文将Kaggle中Flights and Airports Data数据集作为研究对象,使用Spark对其进行简单的pipline建模、指标评估和交叉验证调参,构建一个较为完整的Spark分析实例。
先开启一系列Hadoop、Spark服务与Spark-shell窗口:
数据集:Flights and Airports Data
下载地址:https://www.kaggle.com/tylerx/flights-and-airports-data
软件版本:Ubuntu 19.10、Jdk 1.8.0_241、Hadoop 3.2.1、Spark 2.4.5
使用Spark进行机器学习
导入Spark SQL和Spark ML库
我们将使用Pipleline来准备数据,使用CrossValidator来训练模型的参数,并使用BinaryClassificationEvaluator来训练我们的训练模型,以训练LogisticRegression模型。
// 数据集操作库
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
// 特征与模型库
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{VectorAssembler,StringIndexer,VectorIndexer,MinMaxScaler}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.tuning.{ParamGridBuilder,CrossValidator}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
加载源数据
flight.csv文件数据中的数据包括每个航班的特定特征,指示航班晚点或早点到达的时间。先将其保存至本地磁盘或hdfs中。
为分类模型(决策树学习模型)准备数据
我选择了一部分列作为特征,并创建了一个名为label的布尔标签字段,其值为1或0。具体来说,晚到达的航班为1,早于飞行时间的航班为0 。
val csv = spark.read.format("csv").option("header","true").option("inferSchema","true").load("file:///home/phenix/kaggle/input/flights.csv")
val data = csv.withColumn("label",(col("ArrDelay") > 15).cast("Int"))
data.show()
划分数据
我将使用70%的数据进行训练,并保留30%的数据进行测试。在测试数据中,label列被重命名为trueLabel,因此我以后可以使用它来将预测的标签与已知的实际值进行比较。
val splits = data.randomSplit(Array(0.7,0.3),seed = 100)
val train = splits(0)
val test = splits(1).withColumnRenamed("label","trueLabel")
val train_rows = train.count()
val test_rows = test.count()
println(f"Training Rows: $train_rows, Testing Rows: $test_rows")
定义管道
管道由一系列转换器和估计器阶段组成,这些阶段通常准备用于建模的DataFrame,然后训练预测模型。在这种情况下,您将创建一个包含七个阶段的管道:
StringIndexer,可将字符串值转换为用于分类功能的索引
VectorAssembler,将分类特征组合到单个矢量中
VectorIndexer,用于为分类特征的向量创建索引
VectorAssembler,用于创建连续数字特征的矢量
MinMaxScaler,可标准化连续数字特征
VectorAssembler,可创建具有分类特征和连续特征的向量
val strIdx = new StringIndexer().setInputCol("Carrier").setOutputCol("CarrierIdx")
val catVect = new VectorAssembler().setInputCols(Array("CarrierIdx", "DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID")).setOutputCol("catFeatures")
val catIdx = new VectorIndexer().setInputCol(catVect.getOutputCol).setOutputCol("idxCatFeatures")
val numVect = new VectorAssembler().setInputCols(Array("DepDelay")).setOutputCol("numFeatures")
val minMax = new MinMaxScaler().setInputCol(numVect.getOutputCol).setOutputCol("normFeatures")
val featVect = new VectorAssembler().setInputCols(Array("idxCatFeatures", "normFeatures")).setOutputCol("features")
运行管道以训练模型
在训练数据上作为估计器运行管道以训练模型。
val lr = new LogisticRegression().setLabelCol("label").setFeaturesCol("features").setMaxIter(10).setRegParam(0.3)
val pipeline = new Pipeline().setStages(Array(strIdx, catVect, catIdx, numVect, minMax, featVect, lr))
val pipelineModel = pipeline.fit(train)
生成标签预测
使用流水线中的所有阶段和训练模型来转换测试数据,以生成预测标签。
val prediction = pipelineModel.transform(test)
val predicted = prediction.select("features", "prediction", "trueLabel")
predicted.show(numRows = 100, truncate = false)
查看结果,一些trueLabel 1预测为0。让我们评估模型。
评估分类模型
我们将计算混淆矩阵和ROC下面积以评估模型。
计算混淆矩阵
分类器通常通过创建混淆矩阵进行评估,该矩阵表示以下数目:
TP真阳性
FP真阴性
TN假阳性
FN假阴性
从这些核心指标中,可以计算出其他评估指标,例如精度,召回率和F1值。
val tp = predicted.filter("prediction == 1.0 AND truelabel == 1").count()
val fp = predicted.filter("prediction == 1.0 AND truelabel == 0").count()
val tn = predicted.filter("prediction == 0.0 AND truelabel == 0").count()
val fn = predicted.filter("prediction == 0.0 AND truelabel == 1").count()
val pr = tp*1.0 / (tp + fp)
val re = tp*1.0 / (tp + fn)
val metrics = Seq(("TP",tp*1.0),("FP",fp*1.0),("TN",tn*1.0),("FN",fn*1.0),("Precision",pr),("Recall",re),("F1",2*pr*re/(re+pr))).toDF("metric","value")
metrics.show()
看起来我们的Precision很好,但是Recall却很低,因此我们的F1并不是那么好。
复查ROC曲线下面积
评估分类模型性能的另一种方法是测量模型的ROC曲线下的面积。spark.ml库包含一个BinaryClassificationEvaluator类,我们可以使用它来进行计算。ROC曲线显示了针对不同阈值绘制的真假率和假阳性率。
val evaluator = new BinaryClassificationEvaluator().setLabelCol("trueLabel").setRawPredictionCol("rawPrediction").setMetricName("areaUnderROC")
val aur = evaluator.evaluate(prediction)
println(f"AUR = $aur")
因此,AUR表明我们的模型还可以。让我们深入了解。
查看原始预测和概率
该预测基于描述逻辑函数中标记点的原始预测分数。然后,根据表示每个可能标签值(在这种情况下为0和1)的置信度的概率矢量,将此原始预测转换为0或1的预测标签。选择具有最高置信度的值作为预测。
prediction.select("rawPrediction", "probability", "prediction", "trueLabel").show(100, truncate=false)
请注意,结果包括其中0的概率(概率向量中的第一个值)仅略高于1的概率(概率向量中的第二个值)的行。默认判别阈值(决定概率被预测为1还是0的边界)设置为0.5;因此,无论接近阈值多少,始终使用概率最高的预测。从上面的结果我们可以看到,对于那些我们预测为0的truelabel为1,其许多概率1略小于阈值0.5。
调整参数
为了找到性能最好的参数,我们可以使用CrossValidator类来评估在ParameterGrid中定义的参数的每个组合,这些数据组合是分为训练和验证数据集的多个数据折叠的。请注意,这可能需要很长时间才能运行,因为每个参数组合都会被尝试多次。
更改分类阈值
AUC分数似乎表明模型相当合理,但是性能指标似乎表明它预测假阴性标签的数量很高(即当真实标签为1时它预测为0),从而导致较低的召回率 。我们可以通过降低阈值来改善这一点。相反,有时我们可能想通过提高阈值来解决大量的“误报”问题。在这种情况下,我将让CrossValidator从0.45、0.4和0.35中找到最佳阈值,从0.3和0.1中找到正则化参数,并从10和5中找到最大迭代次数。
val paramGrid = new ParamGridBuilder().addGrid(lr.regParam, Seq(0.3, 0.1)).addGrid(lr.maxIter, Seq(10, 5)).addGrid(lr.threshold, Seq(0.4, 0.3)).build()
val cv = new CrossValidator().setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator()).setEstimatorParamMaps(paramGrid).setNumFolds(2)
val model = cv.fit(train)
val newPrediction = model.transform(test)
val newPredicted = newPrediction.select("features", "prediction", "trueLabel")
newPredicted.show()
请注意,以前预测为0的某些rawPrediction和概率值现在预测为1
// 重新计算混淆矩阵
val tp2 = newPredicted.filter("prediction == 1.0 AND truelabel == 1").count()
val fp2 = newPredicted.filter("prediction == 1.0 AND truelabel == 0").count()
val tn2 = newPredicted.filter("prediction == 0.0 AND truelabel == 0").count()
val fn2 = newPredicted.filter("prediction == 0.0 AND truelabel == 1").count()
val pr2 = tp2*1.0 / (tp2 + fp2)
val re2 = tp2*1.0 / (tp2 + fn2)
val metrics2 = Seq(("TP",tp2*1.0),("FP",fp2*1.0),("TN",tn2*1.0),("FN",fn2*1.0),("Precision",pr2),("Recall",re2),("F1",2*pr2*re2/(re2+pr2))).toDF("metric","value")
metrics2.show()
// Recalculate the Area Under ROC 重新计算ROC曲线下面积
val evaluator2 = new BinaryClassificationEvaluator().setLabelCol("trueLabel").setRawPredictionCol("rawPrediction").setMetricName("areaUnderROC")
val aur2 = evaluator2.evaluate(newPrediction)
println(f"AUR2 = $aur2")
看起来还不错!新模型将召回率从0.11提高到0.38,F1得分从0.20提高到0.55,而不会影响其他指标。下一步仍有很大的改进空间。例如,我可以尝试使用更低阈值的更多选项,或者使用不同的分类模型,或者像添加新功能一样更好地准备数据。有关Spark的基础文章可参考前文:
参考链接:
https://www.kaggle.com/tylerx/machine-learning-with-spark
历史推荐
数据分析与挖掘
数据结构与算法
机器学习与大数据组件
欢迎关注,感谢“在看”,随缘稀罕~
一个赞,晚餐加鸡腿
上一篇: sql合并组合列,并过滤掉相同的