欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark ml pipline交叉验证之Kmeans聚类

程序员文章站 2022-05-08 16:44:17
...

Spark ml pipline交叉验证之Kmeans聚类

1.1 模型训练

1.1.1 输入参数

{
    "modelName ": "KMeans聚类 ",
    "numIterations ": " ",
    "numClasses ": " ",
    "runs ": " ",
    "numFolds ": "5 ",
    "maxIters ": [
        10,
        20,
        50,
        100
    ],
    "ks ": [
        5,
        6,
        7,
        8,
        9,
        10,
        11
    ],
    "seeds ": [
        10,
        20,
        30
    ]
}

1.1.2 训练代码

import com.cetc.common.conf.MachineLearnModel
import com.cetc.miner.compute.utils.{ModelUtils, Utils}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.feature.{StandardScaler, VectorAssembler}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.ml.clustering.{KMeans, KMeansModel}
import org.apache.spark.ml.evaluation.ClusteringEvaluator
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

class KMeansBestTrain {

  val logger: org.apache.log4j.Logger = org.apache.log4j.Logger.getLogger(classOf[LinearRegressionBestTrain])
  /**
    * KMeans聚类 模型训练
    * @param df
    * @param id
    * @param name
    * @param conf
    * @param sparkSession
    * @return
    */
  def execute(df: DataFrame, id: String, name: String, conf: String, sparkSession: SparkSession): java.util.List[Object] = {
    df.cache()
    logger.info("训练集个数========="+ df.count())
    val params = Utils.conf2Class(conf)
    //ML的VectorAssembler是一个transformer,要求数据类型不能是string,将多列数据转化为单列的向量列,比如把age、income等等字段列合并成一个 userFea 向量列,方便后续训练
    val assembler = new VectorAssembler().setInputCols(df.columns).setOutputCol("features")
    //标准化(归一化)
    val standardScaler = new StandardScaler()
      .setInputCol(assembler.getOutputCol)
      .setOutputCol("scaledFeatures")
      .setWithStd(true)//是否将数据缩放到单位标准差。
      .setWithMean(false)//是否在缩放前使用平均值对数据进行居中。
    // Trains a k-means model.
    val kmeans = new KMeans()
        .setFeaturesCol(assembler.getOutputCol)
        .setPredictionCol("prediction")
    //创建机器学习工作流
    val pipeline = new Pipeline().setStages(Array(assembler, standardScaler, kmeans))
    //创建一个聚类评估器
    val clusteringEvaluator = new ClusteringEvaluator()
      .setFeaturesCol("features")
      .setPredictionCol("prediction")
      .setMetricName("silhouette")

    //获取最大迭代次数和正则参数,一共可以训练出(maxIters*regParams)个模型
    import scala.collection.JavaConversions.asScalaBuffer
    val paramMap = new ParamGridBuilder()
      .addGrid(kmeans.getParam("maxIter"), asScalaBuffer(params.getMaxIters))
      .addGrid(kmeans.getParam("k"), asScalaBuffer(params.getKs))
      .addGrid(kmeans.getParam("seed"), asScalaBuffer(params.getSeeds))
      .build
    //创建交叉验证器,他会把训练集分成NumFolds份,然后在其中(NumFolds-1)份里进行训练
    //在其中一份里进行测试,针对上面的每一组参数都会训练出NumFolds个模型,最后选择一个
    // 最优的模型
    val crossValidator = new CrossValidator()
      .setEstimator(pipeline)
      .setEstimatorParamMaps(paramMap)//设置模型需要的超参数组合
      .setNumFolds(params.getNumFolds)//把训练集分成多少份数
      .setEvaluator(clusteringEvaluator)//设置评估器,用户评估测试结果数据

    //模型训练
    val model = crossValidator.fit(df)
    // 最佳模型
    val bestModel = model.bestModel.asInstanceOf[PipelineModel]
    val kmeansModel = bestModel.stages(2).asInstanceOf[KMeansModel]
    println("模型类型========", kmeansModel.getClass)
    //将模型封装成对象
    val modelObject: MachineLearnModel = ModelUtils.saveModel(kmeansModel, params.getModelName, 5, conf, 2, 0.0)
    //保存模型到数据库
    ModelUtils.model2mysql(modelObject)
    val list = new java.util.ArrayList[Object]()
    list.add(modelObject)
    return list
  }
}

1.2 模型评估

1.2.1 输入参数

{"labelColumn":""}

1.2.2 评估代码

import java.util
import com.cetc.common.conf.MachineLearnModel
import com.cetc.miner.compute.utils.{ModelUtils, Utils}
import org.apache.spark.ml.classification.LogisticRegressionModel
import org.apache.spark.ml.clustering.KMeansModel
import org.apache.spark.ml.evaluation.{ClusteringEvaluator, MulticlassClassificationEvaluator}
import org.apache.spark.sql.{DataFrame, SparkSession}

class KMeansAssess {

  val logger: org.apache.log4j.Logger = org.apache.log4j.Logger.getLogger(classOf[LRAssess])
  /**
    * 逻辑回归 分类模型评估
    * @param df
    * @param model
    * @param id
    * @param name
    * @param conf
    * @param sparkSession
    * @return
    */
  def execute(df: DataFrame, model: MachineLearnModel, id: String, name: String, conf: String, sparkSession: SparkSession): java.util.List[Double] = {

    logger.info("测试集个数========="+ df.count())
    val params = Utils.conf2Class(conf)
    val userProfile = Utils.trans2UnsupervisedLearning(df)
    val kmeansModel = ModelUtils.loadModel[KMeansModel](model)
    //创建一个聚类评估器
    val clusteringEvaluator = new ClusteringEvaluator()
      .setFeaturesCol("features")
      .setPredictionCol("prediction")
      .setMetricName("silhouette")
    val testDF = kmeansModel.transform(userProfile)
    testDF.show()
    val silhouette = clusteringEvaluator.evaluate(testDF)
    logger.info("评估结果 轮廓的平方欧氏距离 silhouette==============" + silhouette)
    ModelUtils.updateModel2mysql(model.getName, silhouette)
    val list = new util.ArrayList[Double]()
    list.add(silhouette)
    return list
  }
}