《深度实践Spark机器学习》第9章 构建Spark ML回归模型
程序员文章站
2024-02-15 15:11:05
...
发现一个好地方:
https://www.jianshu.com/p/da2be3876b00
http://archive.ics.uci.edu/ml/datasets/Bike+Sharing+Dataset
# 查看前3行
head -3 hour.csv
# 查看记录总数
wc -l hour.csv
# 查看文件列数
cat hour.csv | head -1 |awk -F ',' '{print NF}'
hdfs dfs -put hour.csv /u01/bigdata/data
hdfs dfs -put day.csv /u01/bigdata/data
# 启动spark-shell
spark-shell --num-executors 1 --total-executor-cores 3 --executor-memory 512m
import org.apache.spark.sql.{Row, SparkSession, DataFrame, Dataset}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.feature.{LabeledPoint, IndexToString, StringIndexer, VectorIndexer, VectorAssembler}
import org.apache.spark.ml.feature.OneHotEncoder
import org.apache.spark.ml.regression.DecisionTreeRegressionModel
import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val raw_data = spark.read.format("csv").option("header", true).load("hdfs://XX:8020/u01/bigdata/data/hour.csv")
scala> raw_data.show(10)
+-------+----------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|instant| dteday|season| yr|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed|casual|registered|cnt|
+-------+----------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
| 1|2011-01-01| 1| 0| 1| 0| 0| 6| 0| 1|0.24|0.2879|0.81| 0| 3| 13| 16|
| 2|2011-01-01| 1| 0| 1| 1| 0| 6| 0| 1|0.22|0.2727| 0.8| 0| 8| 32| 40|
| 3|2011-01-01| 1| 0| 1| 2| 0| 6| 0| 1|0.22|0.2727| 0.8| 0| 5| 27| 32|
| 4|2011-01-01| 1| 0| 1| 3| 0| 6| 0| 1|0.24|0.2879|0.75| 0| 3| 10| 13|
| 5|2011-01-01| 1| 0| 1| 4| 0| 6| 0| 1|0.24|0.2879|0.75| 0| 0| 1| 1|
| 6|2011-01-01| 1| 0| 1| 5| 0| 6| 0| 2|0.24|0.2576|0.75| 0.0896| 0| 1| 1|
| 7|2011-01-01| 1| 0| 1| 6| 0| 6| 0| 1|0.22|0.2727| 0.8| 0| 2| 0| 2|
| 8|2011-01-01| 1| 0| 1| 7| 0| 6| 0| 1| 0.2|0.2576|0.86| 0| 1| 2| 3|
| 9|2011-01-01| 1| 0| 1| 8| 0| 6| 0| 1|0.24|0.2879|0.75| 0| 1| 7| 8|
| 10|2011-01-01| 1| 0| 1| 9| 0| 6| 0| 1|0.32|0.3485|0.76| 0| 8| 6| 14|
+-------+----------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
9.3 探索特征分布
# 查看raw_data数据结构
scala> raw_data.printSchema
root
|-- instant: string (nullable = true)
|-- dteday: string (nullable = true)
|-- season: string (nullable = true)
|-- yr: string (nullable = true)
|-- mnth: string (nullable = true)
|-- hr: string (nullable = true)
|-- holiday: string (nullable = true)
|-- weekday: string (nullable = true)
|-- workingday: string (nullable = true)
|-- weathersit: string (nullable = true)
|-- temp: string (nullable = true)
|-- atemp: string (nullable = true)
|-- hum: string (nullable = true)
|-- windspeed: string (nullable = true)
|-- casual: string (nullable = true)
|-- registered: string (nullable = true)
|-- cnt: string (nullable = true)
# 查看主要字段的统计信息
scala> raw_data.describe("dteday", "holiday", "weekday", "temp").show()
+-------+----------+--------------------+-----------------+-------------------+
|summary| dteday| holiday| weekday| temp|
+-------+----------+--------------------+-----------------+-------------------+
| count| 17379| 17379| 17379| 17379|
| mean| null|0.028770355026181024|3.003682605443351| 0.4969871684216586|
| stddev| null| 0.1671652763843717|2.005771456110986|0.19255612124972202|
| min|2011-01-01| 0| 0| 0.02|
| max|2012-12-31| 1| 6| 1|
+-------+----------+--------------------+-----------------+-------------------+
# pyspark画出特征分布
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
df = pd.read_csv("/root/data/hour.csv", header=0)
sns.set(style='whitegrid', context='notebook')
cols=['season','yr','temp','atemp','hum','windspeed','cnt']
sns.pairplot(df[cols], size=2.5)
plt.show()
9.4 数据预处理 120
9.4.1 特征选择
# 把字符型转为数值型(复制要去空格)
val data1 = raw_data.select(
raw_data("season").cast("Double"),
raw_data("yr").cast("Double"),
raw_data("mnth").cast("Double"),
raw_data("hr").cast("Double"),
raw_data("holiday").cast("Double"),
raw_data("weekday").cast("Double"),
raw_data("workingday").cast("Double"),
raw_data("weathersit").cast("Double"),
raw_data("temp").cast("Double"),
raw_data("atemp").cast("Double"),
raw_data("hum").cast("Double"),
raw_data("windspeed").cast("Double"),
raw_data("cnt").cast("Double").alias("label")
)
# 存放预测特征的特征向量
val featuresArray = Array("season","yr","mnth","hr","holiday","weekday","workingday","weathersit","temp","atemp","hum","windspeed")
# 把原数据组合成特征向量features
val assembler = new VectorAssembler().setInputCols(featuresArray).setOutputCol("features")
9.4.2 特征转换
# 对类别特征进行索引化或特征化
val featureIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(24)
# 转为二元向量
val data2 = new OneHotEncoder().setInputCol("season").setOutputCol("seasonVec")
val data3 = new OneHotEncoder().setInputCol("yr").setOutputCol("yrVec")
val data4 = new OneHotEncoder().setInputCol("mnth").setOutputCol("mnthVec")
val data5 = new OneHotEncoder().setInputCol("hr").setOutputCol("hrVec")
val data6 = new OneHotEncoder().setInputCol("holiday").setOutputCol("holidayVec")
val data7 = new OneHotEncoder().setInputCol("weekday").setOutputCol("weekdayVec")
val data8 = new OneHotEncoder().setInputCol("workingday").setOutputCol("workingdayVec")
val data9 = new OneHotEncoder().setInputCol("weathersit").setOutputCol("weathersitVec")
val pipeline_en = new Pipeline().setStages(Array(data2,data3,data4,data5,data6,data7,data8,data9))
val data_lr = pipeline_en.fit(data1).transform(data1)
# 把原来的4个及8个二元特征向量,拼接成一个feature向量
val featuresVecArray = Array("seasonVec","yrVec","mnthVec","hrVec","holidayVec","weekdayVec","workingdayVec","weathersitVec","temp","atemp","hum","windspeed")
val assembler_lr = new VectorAssembler().setInputCols(featuresVecArray).setOutputCol("features_lr")
9.5 组装
1)将数据分成训练集合测试集
// 这份数据用于决策模型
val Array(trainData, testData) = data1.randomSplit(Array(0.7, 0.3), 12)
// 这份数据用于回归模型
val Array(trainData_lr, testData_lr) = data_lr.randomSplit(Array(0.7, 0.3), 12)
2)设置决策树回归模型参数
val dt = new DecisionTreeRegressor().setLabelCol("label").setFeaturesCol("indexedFeatures").setMaxBins(64).setMaxDepth(15)
3)设置线性回归模型参数
val lr = new LinearRegression().setLabelCol("label").setFeaturesCol("features_lr").setFitIntercept(true).setMaxIter(20).setRegParam(0.3).setElasticNetParam(0.8)
4)把决策树回归模型涉及的特征转换及模型训练组装在一个流水上
val pipeline = new Pipeline().setStages(Array(assembler, featureIndexer, dt))
5)把线性回归模型涉及的特征转换及模型训练组装在一个流水上
val pipeline_lr = new Pipeline().setStages(Array(assembler_lr, lr))
6)训练模型
// 训练决策树回归模型
val model = pipeline.fit(trainData)
// 训练线性回归模型
val model_lr = pipeline_lr.fit(trainData_lr)
7)做出预测
// 预测决策树回归的值
val predictions = model.transform(testData)
// 预测线性回归的值
val predictions_lr = model_lr.transform(testData_lr)
8)模型评估
val evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse")
//决策树模型评估指标
val rmse = evaluator.evaluate(predictions)
//线性回归模型评估指标
val rmse_lr = evaluator.evaluate(predictions_lr)
9.6 模型优化
val featuresVecArray1 = Array("seasonVec","yrVec","mnthVec","hrVec","holidayVec","weekdayVec","workingdayVec","weathersitVec","temp","hum","windspeed")
val assembler_lr1 = new VectorAssembler().setInputCols(featuresVecArray1).setOutputCol("features_lr1")
// 对label特征进行转换,使其更接近正态分布
import org.apache.spark.ml.feature.SQLTransformer
// 对特征label进行SQRT运行
val sqlTrans = new SQLTransformer().setStatement("SELECT *, SQRT(label) as label1 FROM __THIS__")
# 利用训练验证划分法对线性回归模型进行优化
1)导包
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
2)建立模型,预测label1的值,设置线性回归参数
val lr1 = new LinearRegression().setLabelCol("label1").setFeaturesCol("features_lr1").setFitIntercept(true)
3)设置流水线,以遍将特征组合、特征值优化、模型训练等任务组装到这条流水线
val pipeline_lr1 = new Pipeline().setStages(Array(assembler_lr1, sqlTrans, lr1))
4)建立参数网格
val paramGrid_lr1 = new ParamGridBuilder().addGrid(lr1.elasticNetParam, Array(0.1, 0.8, 1.0)).addGrid(lr1.regParam, Array(0.1, 0.3, 0.5)).addGrid(lr1.maxIter, Array(20, 30)).build()
5)选择(prediction, label1),进行测试误差
val evaluator_lr1 = new RegressionEvaluator().setLabelCol("label1").setPredictionCol("prediction").setMetricName("rmse")
// 利用交叉验证方法
val trainValidationSplit = new TrainValidationSplit().setEstimator(pipeline_lr1).setEvaluator(evaluator_lr1).setEstimatorParamMaps(paramGrid_lr1).setTrainRatio(0.8)
6)训练模型并自动选择最优参数
val model_lr1 = trainValidationSplit.fit(trainData_lr)
7)查看模型全部参数
model_lr1.getEstimatorParamMaps.foreach(println ) //参数组合
model_lr1.getEvaluator.extractParamMap() //查看评估参数
model_lr1.getEvaluator.isLargerBetter
8)用最好的参数组合,作出预测
val predictions_lr1 = model_lr1.transform(testData_lr)
val rmse_lr1 = evaluator_lr1.evaluate(predictions_lr1)
// 显示转换后特征值的前5行信息
scala> predictions_lr1.select("features_lr1","label","label1","prediction").show(5)
+--------------------+-----+------------------+------------------+
| features_lr1|label| label1| prediction|
+--------------------+-----+------------------+------------------+
|(55,[1,4,6,17,40,...| 39.0| 6.244997998398398|2.6144256370572547|
|(55,[1,4,6,17,40,...| 7.0|2.6457513110645907| 1.309134103197633|
|(55,[1,4,6,17,40,...| 5.0| 2.23606797749979|1.4901595799053249|
|(55,[1,4,6,17,40,...| 7.0|2.6457513110645907|1.8830157682818198|
|(55,[1,4,6,17,40,...| 12.0|3.4641016151377544|1.6058304701587227|
+--------------------+-----+------------------+------------------+
上一篇: ML之线性回归
下一篇: 一文理解决策树算法中的信息增益