欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark机器学习不想跟你说话并向你扔了一个kaggle小例子

程序员文章站 2022-03-09 18:07:50
...

在前文中分别就Spark机器学习中的各个模块进行逐个描述,本文将Kaggle中Flights and Airports Data数据集作为研究对象,使用Spark对其进行简单的pipline建模、指标评估和交叉验证调参,构建一个较为完整的Spark分析实例。

先开启一系列Hadoop、Spark服务与Spark-shell窗口:

Spark机器学习不想跟你说话并向你扔了一个kaggle小例子

数据集:Flights and Airports Data

下载地址:https://www.kaggle.com/tylerx/flights-and-airports-data

软件版本:Ubuntu 19.10、Jdk 1.8.0_241、Hadoop 3.2.1、Spark 2.4.5

使用Spark进行机器学习

导入Spark SQL和Spark ML库

我们将使用Pipleline来准备数据,使用CrossValidator来训练模型的参数,并使用BinaryClassificationEvaluator来训练我们的训练模型,以训练LogisticRegression模型。

// 数据集操作库
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
// 特征与模型库
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{VectorAssembler,StringIndexer,VectorIndexer,MinMaxScaler}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.tuning.{ParamGridBuilder,CrossValidator}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator

加载源数据

flight.csv文件数据中的数据包括每个航班的特定特征,指示航班晚点或早点到达的时间。先将其保存至本地磁盘或hdfs中。

Spark机器学习不想跟你说话并向你扔了一个kaggle小例子

为分类模型(决策树学习模型)准备数据

我选择了一部分列作为特征,并创建了一个名为label的布尔标签字段,其值为1或0。具体来说,晚到达的航班为1,早于飞行时间的航班为0 。

val csv = spark.read.format("csv").option("header","true").option("inferSchema","true").load("file:///home/phenix/kaggle/input/flights.csv")
val data = csv.withColumn("label",(col("ArrDelay") > 15).cast("Int"))
data.show()

Spark机器学习不想跟你说话并向你扔了一个kaggle小例子

划分数据

我将使用70%的数据进行训练,并保留30%的数据进行测试。在测试数据中,label列被重命名为trueLabel,因此我以后可以使用它来将预测的标签与已知的实际值进行比较。

val splits = data.randomSplit(Array(0.7,0.3),seed = 100)
val train = splits(0)
val test = splits(1).withColumnRenamed("label","trueLabel")


val train_rows = train.count()
val test_rows = test.count()
println(f"Training Rows: $train_rows, Testing Rows: $test_rows")

Spark机器学习不想跟你说话并向你扔了一个kaggle小例子

定义管道

管道由一系列转换器和估计器阶段组成,这些阶段通常准备用于建模的DataFrame,然后训练预测模型。在这种情况下,您将创建一个包含七个阶段的管道:

  • StringIndexer,可将字符串值转换为用于分类功能的索引

  • VectorAssembler,将分类特征组合到单个矢量中

  • VectorIndexer,用于为分类特征的向量创建索引

  • VectorAssembler,用于创建连续数字特征的矢量

  • MinMaxScaler,可标准化连续数字特征

  • VectorAssembler,可创建具有分类特征和连续特征的向量

val strIdx = new StringIndexer().setInputCol("Carrier").setOutputCol("CarrierIdx")
val catVect = new VectorAssembler().setInputCols(Array("CarrierIdx", "DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID")).setOutputCol("catFeatures")
val catIdx = new VectorIndexer().setInputCol(catVect.getOutputCol).setOutputCol("idxCatFeatures")
val numVect = new VectorAssembler().setInputCols(Array("DepDelay")).setOutputCol("numFeatures")
val minMax = new MinMaxScaler().setInputCol(numVect.getOutputCol).setOutputCol("normFeatures")
val featVect = new VectorAssembler().setInputCols(Array("idxCatFeatures", "normFeatures")).setOutputCol("features")

运行管道以训练模型

在训练数据上作为估计器运行管道以训练模型。

val lr = new LogisticRegression().setLabelCol("label").setFeaturesCol("features").setMaxIter(10).setRegParam(0.3)


val pipeline = new Pipeline().setStages(Array(strIdx, catVect, catIdx, numVect, minMax, featVect, lr))
val pipelineModel = pipeline.fit(train)

生成标签预测

使用流水线中的所有阶段和训练模型来转换测试数据,以生成预测标签

val prediction = pipelineModel.transform(test)
val predicted = prediction.select("features", "prediction", "trueLabel")
predicted.show(numRows = 100, truncate = false)

Spark机器学习不想跟你说话并向你扔了一个kaggle小例子

查看结果,一些trueLabel 1预测为0。让我们评估模型。

评估分类模型

我们将计算混淆矩阵ROC下面积以评估模型。

计算混淆矩阵

分类器通常通过创建混淆矩阵进行评估,该矩阵表示以下数目:

  • TP真阳性

  • FP真阴性

  • TN假阳性

  • FN假阴性 

从这些核心指标中,可以计算出其他评估指标,例如精度召回率F1值

val tp = predicted.filter("prediction == 1.0 AND truelabel == 1").count()
val fp = predicted.filter("prediction == 1.0 AND truelabel == 0").count()
val tn = predicted.filter("prediction == 0.0 AND truelabel == 0").count()
val fn = predicted.filter("prediction == 0.0 AND truelabel == 1").count()
val pr = tp*1.0 / (tp + fp)
val re = tp*1.0 / (tp + fn)


val metrics = Seq(("TP",tp*1.0),("FP",fp*1.0),("TN",tn*1.0),("FN",fn*1.0),("Precision",pr),("Recall",re),("F1",2*pr*re/(re+pr))).toDF("metric","value")
metrics.show()

Spark机器学习不想跟你说话并向你扔了一个kaggle小例子

看起来我们的Precision很好,但是Recall却很低,因此我们的F1并不是那么好。

复查ROC曲线下面积

评估分类模型性能的另一种方法是测量模型的ROC曲线下的面积。spark.ml库包含一个BinaryClassificationEvaluator类,我们可以使用它来进行计算。ROC曲线显示了针对不同阈值绘制的真假率和假阳性率。

val evaluator = new BinaryClassificationEvaluator().setLabelCol("trueLabel").setRawPredictionCol("rawPrediction").setMetricName("areaUnderROC")
val aur = evaluator.evaluate(prediction)
println(f"AUR = $aur")

Spark机器学习不想跟你说话并向你扔了一个kaggle小例子

因此,AUR表明我们的模型还可以。让我们深入了解。

查看原始预测和概率

该预测基于描述逻辑函数中标记点的原始预测分数。然后,根据表示每个可能标签值(在这种情况下为0和1)的置信度的概率矢量,将此原始预测转换为0或1的预测标签。选择具有最高置信度的值作为预测。

prediction.select("rawPrediction", "probability", "prediction", "trueLabel").show(100, truncate=false)

请注意,结果包括其中0的概率(概率向量中的第一个值)仅略高于1的概率(概率向量中的第二个值)的行。默认判别阈值(决定概率被预测为1还是0的边界)设置为0.5;因此,无论接近阈值多少,始终使用概率最高的预测。从上面的结果我们可以看到,对于那些我们预测为0的truelabel为1,其许多概率1略小于阈值0.5。

调整参数

为了找到性能最好的参数,我们可以使用CrossValidator类来评估在ParameterGrid中定义的参数的每个组合,这些数据组合是分为训练和验证数据集的多个数据折叠的。请注意,这可能需要很长时间才能运行,因为每个参数组合都会被尝试多次。

更改分类阈值

AUC分数似乎表明模型相当合理,但是性能指标似乎表明它预测假阴性标签的数量很高(即当真实标签为1时它预测为0),从而导致较低的召回率 。我们可以通过降低阈值来改善这一点。相反,有时我们可能想通过提高阈值来解决大量的“误报”问题。在这种情况下,我将让CrossValidator从0.45、0.4和0.35中找到最佳阈值,从0.3和0.1中找到正则化参数,并从10和5中找到最大迭代次数。

val paramGrid = new ParamGridBuilder().addGrid(lr.regParam, Seq(0.3, 0.1)).addGrid(lr.maxIter, Seq(10, 5)).addGrid(lr.threshold, Seq(0.4, 0.3)).build()
val cv = new CrossValidator().setEstimator(pipeline).setEvaluator(new BinaryClassificationEvaluator()).setEstimatorParamMaps(paramGrid).setNumFolds(2)
val model = cv.fit(train)
val newPrediction = model.transform(test)
val newPredicted = newPrediction.select("features", "prediction", "trueLabel")
newPredicted.show()

Spark机器学习不想跟你说话并向你扔了一个kaggle小例子

请注意,以前预测为0的某些rawPrediction和概率值现在预测为1

// 重新计算混淆矩阵
val tp2 = newPredicted.filter("prediction == 1.0 AND truelabel == 1").count()
val fp2 = newPredicted.filter("prediction == 1.0 AND truelabel == 0").count()
val tn2 = newPredicted.filter("prediction == 0.0 AND truelabel == 0").count()
val fn2 = newPredicted.filter("prediction == 0.0 AND truelabel == 1").count()


val pr2 = tp2*1.0 / (tp2 + fp2)
val re2 = tp2*1.0 / (tp2 + fn2)
val metrics2 = Seq(("TP",tp2*1.0),("FP",fp2*1.0),("TN",tn2*1.0),("FN",fn2*1.0),("Precision",pr2),("Recall",re2),("F1",2*pr2*re2/(re2+pr2))).toDF("metric","value")
metrics2.show()


// Recalculate the Area Under ROC 重新计算ROC曲线下面积
val evaluator2 = new BinaryClassificationEvaluator().setLabelCol("trueLabel").setRawPredictionCol("rawPrediction").setMetricName("areaUnderROC")
val aur2 = evaluator2.evaluate(newPrediction)
println(f"AUR2 = $aur2")

Spark机器学习不想跟你说话并向你扔了一个kaggle小例子

Spark机器学习不想跟你说话并向你扔了一个kaggle小例子

    看起来还不错!新模型将召回率从0.11提高到0.38,F1得分从0.20提高到0.55,而不会影响其他指标。下一步仍有很大的改进空间。例如,我可以尝试使用更低阈值的更多选项,或者使用不同的分类模型,或者像添加新功能一样更好地准备数据。有关Spark的基础文章可参考

    Spark分布式机器学习源码分析:矩阵向量

    Spark分布式机器学习源码分析:基本统计

    Spark分布式机器学习源码分析:线性模型

    Spark分布式机器学习源码分析:朴素贝叶斯

    Spark分布式机器学习源码分析:决策树模型

    Spark分布式机器学习源码分析:集成树模型

    Spark分布式机器学习源码分析:协同过滤

    Spark分布式机器学习源码分析:K-means

    Spark分布式机器学习源码分析:LDA模型

    Spark分布式机器学习源码分析:SVD和PCA

    Spark分布式机器学习源码分析:特征提取与转换

    Spark分布式机器学习源码分析:频繁模式挖掘

    Spark分布式机器学习源码分析:模型评估指标

    参考链接:

    https://www.kaggle.com/tylerx/machine-learning-with-spark

Spark机器学习不想跟你说话并向你扔了一个kaggle小例子

历史推荐

“高频面经”之数据分析篇

“高频面经”之数据结构与算法篇

“高频面经”之大数据研发篇

“高频面经”之机器学习篇

“高频面经”之深度学习篇

爬虫实战:Selenium爬取京东商品

爬虫实战:豆瓣电影top250爬取

爬虫实战:Scrapy框架爬取QQ音乐

Spark机器学习不想跟你说话并向你扔了一个kaggle小例子

数据分析与挖掘

数据结构与算法

机器学习与大数据组件

欢迎关注,感谢“在看”,随缘稀罕~

Spark机器学习不想跟你说话并向你扔了一个kaggle小例子 

Spark机器学习不想跟你说话并向你扔了一个kaggle小例子

一个赞,晚餐加鸡腿