读书笔记- the data engineer's guide to spark
程序员文章站
2023-12-30 19:16:58
...
// start spark session
spark-shell
// load data
val staticDataFrame = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("s3://houzhe/mydata/guide/*.csv")
// print schema
staticDataFrame.printSchema()
/*
root
|-- InvoiceNo: string (nullable = true)
|-- StockCode: string (nullable = true)
|-- Description: string (nullable = true)
|-- Quantity: integer (nullable = true)
|-- InvoiceDate: string (nullable = true)
|-- UnitPrice: double (nullable = true)
|-- CustomerID: integer (nullable = true)
|-- Country: string (nullable = true)
*/
staticDataFrame.show()
/*
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
| 536365| 85123A|WHITE HANGING HEA...| 6|12/1/2010 8:26| 2.55| 17850|United Kingdom|
| 536365| 71053| WHITE METAL *| 6|12/1/2010 8:26| 3.39| 17850|United Kingdom|
| 536365| 84406B|CREAM CUPID HEART...| 8|12/1/2010 8:26| 2.75| 17850|United Kingdom|
| 536365| 84029G|KNITTED UNION FLA...| 6|12/1/2010 8:26| 3.39| 17850|United Kingdom|
| 536365| 84029E|RED WOOLLY HOTTIE...| 6|12/1/2010 8:26| 3.39| 17850|United Kingdom|
| 536365| 22752|SET 7 BABUSHKA NE...| 2|12/1/2010 8:26| 7.65| 17850|United Kingdom|
| 536365| 21730|GLASS STAR FROSTE...| 6|12/1/2010 8:26| 4.25| 17850|United Kingdom|
| 536366| 22633|HAND WARMER UNION...| 6|12/1/2010 8:28| 1.85| 17850|United Kingdom|
| 536366| 22632|HAND WARMER RED P...| 6|12/1/2010 8:28| 1.85| 17850|United Kingdom|
| 536367| 84879|ASSORTED COLOUR B...| 32|12/1/2010 8:34| 1.69| 13047|United Kingdom|
| 536367| 22745|POPPY'S PLAYHOUSE...| 6|12/1/2010 8:34| 2.1| 13047|United Kingdom|
| 536367| 22748|POPPY'S PLAYHOUSE...| 6|12/1/2010 8:34| 2.1| 13047|United Kingdom|
| 536367| 22749|FELTCRAFT PRINCES...| 8|12/1/2010 8:34| 3.75| 13047|United Kingdom|
| 536367| 22310|IVORY KNITTED MUG...| 6|12/1/2010 8:34| 1.65| 13047|United Kingdom|
| 536367| 84969|BOX OF 6 ASSORTED...| 6|12/1/2010 8:34| 4.25| 13047|United Kingdom|
| 536367| 22623|BOX OF VINTAGE JI...| 3|12/1/2010 8:34| 4.95| 13047|United Kingdom|
| 536367| 22622|BOX OF VINTAGE AL...| 2|12/1/2010 8:34| 9.95| 13047|United Kingdom|
| 536367| 21754|HOME BUILDING BLO...| 3|12/1/2010 8:34| 5.95| 13047|United Kingdom|
| 536367| 21755|LOVE BUILDING BLO...| 3|12/1/2010 8:34| 5.95| 13047|United Kingdom|
| 536367| 21777|RECIPE BOX WITH M...| 4|12/1/2010 8:34| 7.95| 13047|United Kingdom|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
only showing top 20 rows
*/
// MLlib need all data to be numeric values, we need transform data
import org.apache.spark.sql.functions.date_format
val preppedDataFrame = staticDataFrame.na.fill(0).withColumn("day_of_week", date_format(unix_timestamp($"InvoiceDate", "MM/dd/yyyy HH:mm").cast("timestamp"), "EEEE")).coalesce(5)
// note 1: The coalesce method reduces the number of partitions in a DataFrame. Here’s how to consolidate the data in five partitions
// note 2: 'InvoiceDate' is not a valid ISO 8601 timestamp, so we need transform it at first
preppedDataFrame.show
/*
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+-----------+
|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country|day_of_week|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+-----------+
| 536365| 85123A|WHITE HANGING HEA...| 6|12/1/2010 8:26| 2.55| 17850|United Kingdom| Wednesday|
| 536365| 71053| WHITE METAL *| 6|12/1/2010 8:26| 3.39| 17850|United Kingdom| Wednesday|
| 536365| 84406B|CREAM CUPID HEART...| 8|12/1/2010 8:26| 2.75| 17850|United Kingdom| Wednesday|
| 536365| 84029G|KNITTED UNION FLA...| 6|12/1/2010 8:26| 3.39| 17850|United Kingdom| Wednesday|
| 536365| 84029E|RED WOOLLY HOTTIE...| 6|12/1/2010 8:26| 3.39| 17850|United Kingdom| Wednesday|
| 536365| 22752|SET 7 BABUSHKA NE...| 2|12/1/2010 8:26| 7.65| 17850|United Kingdom| Wednesday|
*/
// splite data into training set and testing set
val trainDataFrame = preppedDataFrame.where("InvoiceDate < '2011-07-01'")
val testDataFrame = preppedDataFrame.where("InvoiceDate >= '2011-07-01'")
trainDataFrame.count()
// res11: Long = 276313
testDataFrame.count()
// res12: Long = 265596
// spark's general transformer which can turn days of weeks into corresponding numeric values
import org.apache.spark.ml.feature.StringIndexer
val indexer = new StringIndexer().setInputCol("day_of_week").setOutputCol("day_of_week_index")
import org.apache.spark.ml.feature.OneHotEncoder
val encoder = new OneHotEncoder().setInputCol("day_of_week_index").setOutputCol("day_of_week_encoded")
// all machine learning algorithms in Spark take as input a Vector type, which must be a set of numerical values
import org.apache.spark.ml.feature.VectorAssembler
val vectorAssembler = new VectorAssembler().setInputCols(Array("UnitPrice", "Quantity", "day_of_week_encoded")).setOutputCol("features")
// set this up into a pipeline so any future data we need to transform can go through the exact same process
import org.apache.spark.ml.Pipeline
val transformationPipelie = new Pipeline().setStages(Array(indexer, encoder, vectorAssembler))
// fit pipeline
val fittedPipeline = transformationPipelie.fit(trainDataFrame)
// do transformation
val transformedTraining = fittedPipeline.transform(trainDataFrame)
transformedTraining.cache()
// import ML model
import org.apache.spark.ml.clustering.KMeans
val kmeans = new KMeans().setK(20).setSeed(1L)
// in spark, training ML model is a two phase process. 1. initialie an untrained model; 2. train it. Naming convention, Algorithm for untrained version, and AlgorithmModel for trained version.
val kmModel = kmeans.fit(transformedTraining)
kmModel.computeCost(transformedTraining)
// res17: Double = 9.889216283558984E7
val transformedTest = fittedPipeline.transform(testDataFrame)
kmModel.computeCost(transformedTest)
// res19: Double = 8.448934990353165E8