Spark ML机器学习算法svm,als,线性回归,逻辑回归简单试验
程序员文章站
2024-02-15 15:07:28
...
很久很久以前看的ML提供的几个算法,简单运行了下官网的代码,说实话价值不大,还是应当到官网去学习,我唯一做的就是添加了几句中文注释,如果是学习官网示例遇到障碍,不了解数据集的格式等可以在这学习一下
线性回归
数据集格式
试验过程
导入训练集数据,将其解析为带label的RDD,然后使用LinearRegressionWithSGD 建立一个简单的线性模型预测label的值,最后计算了均方差来评估预测值与实际值的吻合度
object LinearRegressionModelDemo {
def main(args: Array[String]): Unit = {
// Load and parse the data
val conf = new SparkConf().setAppName("LeanerRegressionModelDemo").setMaster("local")
val sc = new SparkContext(conf)
val root = SVMWithSGDDemo.getClass.getResource("/")
val data = sc.textFile(root + "ridge-data/lpsa.data")
val parsedData = data.map { line =>
val parts = line.split(',')
LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
}.cache()
// 建模
//parsedData:RDD (label, array of features)
val numIterations = 100//迭代次数
val stepSize = 0.00000001//梯度下降的每次迭代的步长
val model = LinearRegressionWithSGD.train(parsedData, numIterations, stepSize)
// 评估训练样本模型并计算训练误差
val valuesAndPreds = parsedData.map { point =>
val prediction = model.predict(point.features)
(point.label, prediction)
}
val MSE = valuesAndPreds.map{case(v, p) => math.pow((v - p), 2)}.mean()
println("training Mean Squared Error = " + MSE)
// 保存,加载模型
model.save(sc, "lineaner")//保存到项目根目录,生成一个新目录,里面有数据和元数据两个目录,数据保存为parguet和crc格式
val sameModel = LinearRegressionModel.load(sc, "lineaner")
}
}
输出评估结果
逻辑回归
数据集格式
试验过程
加载多类数据集,拆分为训练集和测试集,使用LogisticRegressionWithLBFGS拟合逻辑回归模型。然后根据测试数据集对模型进行评估
object LogisticRegressionDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("LogisticRegressionDemo").setMaster("local")
val sc = new SparkContext(conf)
val root = this.getClass.getResource("/")
// 加载数据集 :LIBSVM 格式.
val data = MLUtils.loadLibSVMFile(sc, root+"sample_libsvm_data.txt")
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()//训练集
val test = splits(1)//测试集root+
// 建模
val model = new LogisticRegressionWithLBFGS()
.setNumClasses(10)//可能的结果数量
.run(training)
// 对测试集进行预测
val predictionAndLabels = test.map { case LabeledPoint(label, features) =>
val prediction = model.predict(features)
(prediction, label)
}
// 评估指标
val metrics = new MulticlassMetrics(predictionAndLabels)
val precision = metrics.precision
println("Precision = " + precision)
// 保存,加载模型
model.save(sc, "myModelPath")
val sameModel = LogisticRegressionModel.load(sc, "myModelPath")
}
}
输出评估结果
SVM
数据集格式和逻辑回归相同
试验过程
加载数据集,使用SVMWithSGD中train方法训练算法,并使用得到的模型进行预测计算训练误差
object SVMWithSGDDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SVMWithSGDDemo").setMaster("local")
val sc = new SparkContext(conf)
val root = SVMWithSGDDemo.getClass.getResource("/")
//样本数据,分类标签lable只能是1.0和0.0两种,feature为double类型,输入类型是RDD[LabelPoint]
val data = MLUtils.loadLibSVMFile(sc, root + "sample_libsvm_data.txt")
// 拆分数据集
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0).cache()
val test = splits(1)
// 建模
val numIterations = 100//迭代次数,默认为100
val model = SVMWithSGD.train(training, numIterations)
//stepSize: 迭代步长,默认为1.0,miniBatchFraction: 每次迭代参与计算的样本比例,默认为1.0,initialWeights:初始权重,默认为0向量
//清除默认阈值
model.clearThreshold()
// 对测试集进行预测
val scoreAndLabels = test.map { point =>
val score = model.predict(point.features)
(score, point.label)
}
// 二分类评估
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
val auROC = metrics.areaUnderROC()
println("Area under ROC = " + auROC)
model.save(sc, "SVM")
val sameModel = SVMModel.load(sc, "SVM")
}
}
输出评估结果
als
数据集
1,1,5.0
1,2,1.0
1,3,5.0
1,4,1.0
2,1,5.0
2,2,1.0
2,3,5.0
2,4,1.0
3,1,1.0
3,2,5.0
3,3,1.0
3,4,5.0
4,1,1.0
4,2,5.0
4,3,1.0
4,4,5.0
试验过程
object ALSDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("ALSDemo").setMaster("local")
val sc = new SparkContext(conf)
val root = ALSDemo.getClass.getResource("/")
val data = sc.textFile(root + "als/test.data")
//加载,解析数据
val ratings = data.map(_.split(',') match { case Array(user, item, rate) =>
Rating(user.toInt, item.toInt, rate.toDouble)
})//用模式匹配处理可以剔除不合法数据
//建模
val rank = 10
val numIterations = 10
val model = ALS.train(ratings, rank, numIterations, 0.01)
//rank 每次迭代都能降低评级矩阵的重建误差,少数迭代后ALS 模型便已经收敛为一个比较好的模型,10次左右
//numIterations ALS模型中隐含特征个数,即"用户-特征"和"产品-特征"矩阵的列数;一般来说,它也是低阶近似矩阵的阶,直接影响模型训练和保存的内存开销,尤其是在用户和物品都很多的时候,常作为训练效果与系统开销之间的调节参数。通常其合理取值为10~200
//lamdba=0.01 正则化因子:控制模型的正则化过程,控制模型的过拟合情况,值越高正则化越严厉。该参数的赋值与实际数据的大小,特征、稀疏程度有关,应该通过非样本的测试数据进行交叉验证来调整,值越大越不容易产生过拟合,但值太大会减低分解的准确度
//alpha = 1.0控制偏好观察的基线置信度的ALS的隐式反馈变体的参数,控制矩阵分解时,被观察到的"用户-产品"交互相对没被观察到的交互的权重
val usersProducts = ratings.map { case Rating(user, product, rate) =>
(user, product)
}
//predict返回RDD[Rating]类型
val ans=model.predict(usersProducts)
// ans.saveAsTextFile("alses")//默认保存在根目录
// Rating(4,4,4.996799734728247)
// Rating(4,1,1.0011685369619117)
// Rating(4,3,1.0011685369619117)
// Rating(4,2,4.996799734728247)
// Rating(1,4,1.001174551680804)
// Rating(1,1,4.99676964634083)
// Rating(1,3,4.99676964634083)
// Rating(1,2,1.001174551680804)
// Rating(3,4,4.996799734728247)
// Rating(3,1,1.0011685369619117)
// Rating(3,3,1.0011685369619117)
// Rating(3,2,4.996799734728247)
// Rating(2,4,1.001174551680804)
// Rating(2,1,4.99676964634083)
// Rating(2,3,4.99676964634083)
// Rating(2,2,1.001174551680804)
val predictions =
model.predict(usersProducts).map { case Rating(user, product, rate) =>
((user, product), rate)
}
val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
((user, product), rate)
}.join(predictions)
//计算方差
val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
val err = (r1 - r2)
err * err
}.mean()
println("Mean Squared Error = " + MSE)
model.save(sc, "alses")
val sameModel = MatrixFactorizationModel.load(sc, "alses")
}
}