欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

《深度实践Spark机器学习》第9章 构建Spark ML回归模型

程序员文章站 2024-02-15 15:11:05
...

发现一个好地方:

https://www.jianshu.com/p/da2be3876b00

9.2 数据加载
http://archive.ics.uci.edu/ml/datasets/Bike+Sharing+Dataset

# 查看前3行
head -3 hour.csv

# 查看记录总数
wc -l hour.csv

# 查看文件列数
cat hour.csv | head -1 |awk -F ',' '{print NF}'

hdfs dfs -put hour.csv /u01/bigdata/data
hdfs dfs -put day.csv /u01/bigdata/data

        # 启动spark-shell

        spark-shell --num-executors 1 --total-executor-cores 3 --executor-memory 512m


import org.apache.spark.sql.{Row, SparkSession, DataFrame, Dataset}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.feature.{LabeledPoint, IndexToString, StringIndexer, VectorIndexer, VectorAssembler}
import org.apache.spark.ml.feature.OneHotEncoder
import org.apache.spark.ml.regression.DecisionTreeRegressionModel
import org.apache.spark.ml.regression.DecisionTreeRegressor
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

val raw_data = spark.read.format("csv").option("header", true).load("hdfs://XX:8020/u01/bigdata/data/hour.csv")

scala> raw_data.show(10)
+-------+----------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|instant|    dteday|season| yr|mnth| hr|holiday|weekday|workingday|weathersit|temp| atemp| hum|windspeed|casual|registered|cnt|
+-------+----------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+
|      1|2011-01-01|     1|  0|   1|  0|      0|      6|         0|         1|0.24|0.2879|0.81|        0|     3|        13| 16|
|      2|2011-01-01|     1|  0|   1|  1|      0|      6|         0|         1|0.22|0.2727| 0.8|        0|     8|        32| 40|
|      3|2011-01-01|     1|  0|   1|  2|      0|      6|         0|         1|0.22|0.2727| 0.8|        0|     5|        27| 32|
|      4|2011-01-01|     1|  0|   1|  3|      0|      6|         0|         1|0.24|0.2879|0.75|        0|     3|        10| 13|
|      5|2011-01-01|     1|  0|   1|  4|      0|      6|         0|         1|0.24|0.2879|0.75|        0|     0|         1|  1|
|      6|2011-01-01|     1|  0|   1|  5|      0|      6|         0|         2|0.24|0.2576|0.75|   0.0896|     0|         1|  1|
|      7|2011-01-01|     1|  0|   1|  6|      0|      6|         0|         1|0.22|0.2727| 0.8|        0|     2|         0|  2|
|      8|2011-01-01|     1|  0|   1|  7|      0|      6|         0|         1| 0.2|0.2576|0.86|        0|     1|         2|  3|
|      9|2011-01-01|     1|  0|   1|  8|      0|      6|         0|         1|0.24|0.2879|0.75|        0|     1|         7|  8|
|     10|2011-01-01|     1|  0|   1|  9|      0|      6|         0|         1|0.32|0.3485|0.76|        0|     8|         6| 14|
+-------+----------+------+---+----+---+-------+-------+----------+----------+----+------+----+---------+------+----------+---+



9.3 探索特征分布
# 查看raw_data数据结构
scala> raw_data.printSchema
root
 |-- instant: string (nullable = true)
 |-- dteday: string (nullable = true)
 |-- season: string (nullable = true)
 |-- yr: string (nullable = true)
 |-- mnth: string (nullable = true)
 |-- hr: string (nullable = true)
 |-- holiday: string (nullable = true)
 |-- weekday: string (nullable = true)
 |-- workingday: string (nullable = true)
 |-- weathersit: string (nullable = true)
 |-- temp: string (nullable = true)
 |-- atemp: string (nullable = true)
 |-- hum: string (nullable = true)
 |-- windspeed: string (nullable = true)
 |-- casual: string (nullable = true)
 |-- registered: string (nullable = true)
 |-- cnt: string (nullable = true)


# 查看主要字段的统计信息
scala> raw_data.describe("dteday", "holiday", "weekday", "temp").show()
+-------+----------+--------------------+-----------------+-------------------+
|summary|    dteday|             holiday|          weekday|               temp|
+-------+----------+--------------------+-----------------+-------------------+
|  count|     17379|               17379|            17379|              17379|
|   mean|      null|0.028770355026181024|3.003682605443351| 0.4969871684216586|
| stddev|      null|  0.1671652763843717|2.005771456110986|0.19255612124972202|
|    min|2011-01-01|                   0|                0|               0.02|
|    max|2012-12-31|                   1|                6|                  1|
+-------+----------+--------------------+-----------------+-------------------+




# pyspark画出特征分布
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt


df = pd.read_csv("/root/data/hour.csv", header=0)
sns.set(style='whitegrid', context='notebook')
cols=['season','yr','temp','atemp','hum','windspeed','cnt']
sns.pairplot(df[cols], size=2.5)

plt.show()

       《深度实践Spark机器学习》第9章 构建Spark ML回归模型


9.4 数据预处理 120
9.4.1 特征选择
# 把字符型转为数值型(复制要去空格)
val data1 = raw_data.select(
raw_data("season").cast("Double"),
raw_data("yr").cast("Double"),
raw_data("mnth").cast("Double"),
raw_data("hr").cast("Double"),
raw_data("holiday").cast("Double"),
raw_data("weekday").cast("Double"),
raw_data("workingday").cast("Double"),
raw_data("weathersit").cast("Double"),
raw_data("temp").cast("Double"),
raw_data("atemp").cast("Double"),
raw_data("hum").cast("Double"),
raw_data("windspeed").cast("Double"),
raw_data("cnt").cast("Double").alias("label")
)

# 存放预测特征的特征向量
val featuresArray = Array("season","yr","mnth","hr","holiday","weekday","workingday","weathersit","temp","atemp","hum","windspeed")

# 把原数据组合成特征向量features
val assembler = new VectorAssembler().setInputCols(featuresArray).setOutputCol("features")

9.4.2 特征转换
# 对类别特征进行索引化或特征化
val featureIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(24)

# 转为二元向量
val data2 = new OneHotEncoder().setInputCol("season").setOutputCol("seasonVec")
val data3 = new OneHotEncoder().setInputCol("yr").setOutputCol("yrVec")
val data4 = new OneHotEncoder().setInputCol("mnth").setOutputCol("mnthVec")
val data5 = new OneHotEncoder().setInputCol("hr").setOutputCol("hrVec")
val data6 = new OneHotEncoder().setInputCol("holiday").setOutputCol("holidayVec")
val data7 = new OneHotEncoder().setInputCol("weekday").setOutputCol("weekdayVec")
val data8 = new OneHotEncoder().setInputCol("workingday").setOutputCol("workingdayVec")
val data9 = new OneHotEncoder().setInputCol("weathersit").setOutputCol("weathersitVec")

val pipeline_en = new Pipeline().setStages(Array(data2,data3,data4,data5,data6,data7,data8,data9))
val data_lr = pipeline_en.fit(data1).transform(data1)

# 把原来的4个及8个二元特征向量,拼接成一个feature向量
val featuresVecArray = Array("seasonVec","yrVec","mnthVec","hrVec","holidayVec","weekdayVec","workingdayVec","weathersitVec","temp","atemp","hum","windspeed")
val assembler_lr = new VectorAssembler().setInputCols(featuresVecArray).setOutputCol("features_lr")

9.5 组装 
1)将数据分成训练集合测试集
// 这份数据用于决策模型
val Array(trainData, testData) = data1.randomSplit(Array(0.7, 0.3), 12)

// 这份数据用于回归模型
val Array(trainData_lr, testData_lr) = data_lr.randomSplit(Array(0.7, 0.3), 12)

2)设置决策树回归模型参数
val dt = new DecisionTreeRegressor().setLabelCol("label").setFeaturesCol("indexedFeatures").setMaxBins(64).setMaxDepth(15)

3)设置线性回归模型参数
val lr = new LinearRegression().setLabelCol("label").setFeaturesCol("features_lr").setFitIntercept(true).setMaxIter(20).setRegParam(0.3).setElasticNetParam(0.8)


4)把决策树回归模型涉及的特征转换及模型训练组装在一个流水上
val pipeline = new Pipeline().setStages(Array(assembler, featureIndexer, dt))

5)把线性回归模型涉及的特征转换及模型训练组装在一个流水上
val pipeline_lr = new Pipeline().setStages(Array(assembler_lr, lr))

6)训练模型
// 训练决策树回归模型
val model = pipeline.fit(trainData)

// 训练线性回归模型
val model_lr = pipeline_lr.fit(trainData_lr)

7)做出预测
// 预测决策树回归的值
val predictions = model.transform(testData)

// 预测线性回归的值
val predictions_lr = model_lr.transform(testData_lr)

8)模型评估
val evaluator = new RegressionEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("rmse")

//决策树模型评估指标
val rmse = evaluator.evaluate(predictions)

//线性回归模型评估指标
val rmse_lr = evaluator.evaluate(predictions_lr)

9.6 模型优化
val featuresVecArray1 = Array("seasonVec","yrVec","mnthVec","hrVec","holidayVec","weekdayVec","workingdayVec","weathersitVec","temp","hum","windspeed")
val assembler_lr1 = new VectorAssembler().setInputCols(featuresVecArray1).setOutputCol("features_lr1")

// 对label特征进行转换,使其更接近正态分布
import org.apache.spark.ml.feature.SQLTransformer
// 对特征label进行SQRT运行
val sqlTrans = new SQLTransformer().setStatement("SELECT *, SQRT(label) as label1 FROM __THIS__")

# 利用训练验证划分法对线性回归模型进行优化
1)导包
import org.apache.spark.ml.tuning.{ParamGridBuilder,  TrainValidationSplit}

2)建立模型,预测label1的值,设置线性回归参数
val lr1 = new LinearRegression().setLabelCol("label1").setFeaturesCol("features_lr1").setFitIntercept(true)

3)设置流水线,以遍将特征组合、特征值优化、模型训练等任务组装到这条流水线
val pipeline_lr1 = new Pipeline().setStages(Array(assembler_lr1, sqlTrans, lr1))

4)建立参数网格
val paramGrid_lr1 = new ParamGridBuilder().addGrid(lr1.elasticNetParam, Array(0.1, 0.8, 1.0)).addGrid(lr1.regParam, Array(0.1, 0.3, 0.5)).addGrid(lr1.maxIter, Array(20, 30)).build()

5)选择(prediction, label1),进行测试误差
val evaluator_lr1 = new RegressionEvaluator().setLabelCol("label1").setPredictionCol("prediction").setMetricName("rmse")

// 利用交叉验证方法
val trainValidationSplit = new TrainValidationSplit().setEstimator(pipeline_lr1).setEvaluator(evaluator_lr1).setEstimatorParamMaps(paramGrid_lr1).setTrainRatio(0.8)

6)训练模型并自动选择最优参数
val model_lr1 = trainValidationSplit.fit(trainData_lr)

7)查看模型全部参数
model_lr1.getEstimatorParamMaps.foreach(println ) //参数组合
model_lr1.getEvaluator.extractParamMap() //查看评估参数
model_lr1.getEvaluator.isLargerBetter

8)用最好的参数组合,作出预测
val predictions_lr1 = model_lr1.transform(testData_lr)
val rmse_lr1 = evaluator_lr1.evaluate(predictions_lr1)

// 显示转换后特征值的前5行信息
scala> predictions_lr1.select("features_lr1","label","label1","prediction").show(5)
+--------------------+-----+------------------+------------------+
|        features_lr1|label|            label1|        prediction|
+--------------------+-----+------------------+------------------+
|(55,[1,4,6,17,40,...| 39.0| 6.244997998398398|2.6144256370572547|
|(55,[1,4,6,17,40,...|  7.0|2.6457513110645907| 1.309134103197633|
|(55,[1,4,6,17,40,...|  5.0|  2.23606797749979|1.4901595799053249|
|(55,[1,4,6,17,40,...|  7.0|2.6457513110645907|1.8830157682818198|
|(55,[1,4,6,17,40,...| 12.0|3.4641016151377544|1.6058304701587227|
+--------------------+-----+------------------+------------------+