欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

读书笔记- the data engineer's guide to spark

程序员文章站 2023-12-30 19:16:58
...
// start spark session
spark-shell

// load data
val staticDataFrame = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load("s3://houzhe/mydata/guide/*.csv")

// print schema
staticDataFrame.printSchema()
/*
root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)
*/

staticDataFrame.show()
/*
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL *|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|     4.25|     17850|United Kingdom|
|   536366|    22633|HAND WARMER UNION...|       6|12/1/2010 8:28|     1.85|     17850|United Kingdom|
|   536366|    22632|HAND WARMER RED P...|       6|12/1/2010 8:28|     1.85|     17850|United Kingdom|
|   536367|    84879|ASSORTED COLOUR B...|      32|12/1/2010 8:34|     1.69|     13047|United Kingdom|
|   536367|    22745|POPPY'S PLAYHOUSE...|       6|12/1/2010 8:34|      2.1|     13047|United Kingdom|
|   536367|    22748|POPPY'S PLAYHOUSE...|       6|12/1/2010 8:34|      2.1|     13047|United Kingdom|
|   536367|    22749|FELTCRAFT PRINCES...|       8|12/1/2010 8:34|     3.75|     13047|United Kingdom|
|   536367|    22310|IVORY KNITTED MUG...|       6|12/1/2010 8:34|     1.65|     13047|United Kingdom|
|   536367|    84969|BOX OF 6 ASSORTED...|       6|12/1/2010 8:34|     4.25|     13047|United Kingdom|
|   536367|    22623|BOX OF VINTAGE JI...|       3|12/1/2010 8:34|     4.95|     13047|United Kingdom|
|   536367|    22622|BOX OF VINTAGE AL...|       2|12/1/2010 8:34|     9.95|     13047|United Kingdom|
|   536367|    21754|HOME BUILDING BLO...|       3|12/1/2010 8:34|     5.95|     13047|United Kingdom|
|   536367|    21755|LOVE BUILDING BLO...|       3|12/1/2010 8:34|     5.95|     13047|United Kingdom|
|   536367|    21777|RECIPE BOX WITH M...|       4|12/1/2010 8:34|     7.95|     13047|United Kingdom|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+
only showing top 20 rows
*/
 
// MLlib need all data to be numeric values, we need transform data
import org.apache.spark.sql.functions.date_format
val preppedDataFrame = staticDataFrame.na.fill(0).withColumn("day_of_week", date_format(unix_timestamp($"InvoiceDate", "MM/dd/yyyy HH:mm").cast("timestamp"), "EEEE")).coalesce(5)

// note 1: The coalesce method reduces the number of partitions in a DataFrame. Here’s how to consolidate the data in five partitions
// note 2: 'InvoiceDate' is not a valid ISO 8601 timestamp, so we need transform it at first

preppedDataFrame.show
/*
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+-----------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|UnitPrice|CustomerID|       Country|day_of_week|
+---------+---------+--------------------+--------+--------------+---------+----------+--------------+-----------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|     2.55|     17850|United Kingdom|  Wednesday|
|   536365|    71053| WHITE METAL *|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|  Wednesday|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|     2.75|     17850|United Kingdom|  Wednesday|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|  Wednesday|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|     3.39|     17850|United Kingdom|  Wednesday|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|     7.65|     17850|United Kingdom|  Wednesday|
*/

// splite data into training set and testing set
val trainDataFrame = preppedDataFrame.where("InvoiceDate < '2011-07-01'")
val testDataFrame = preppedDataFrame.where("InvoiceDate >= '2011-07-01'")

trainDataFrame.count()
// res11: Long = 276313

testDataFrame.count()
// res12: Long = 265596

// spark's general transformer which can turn days of weeks into corresponding numeric values
import org.apache.spark.ml.feature.StringIndexer
val indexer = new StringIndexer().setInputCol("day_of_week").setOutputCol("day_of_week_index")

import org.apache.spark.ml.feature.OneHotEncoder
val encoder = new OneHotEncoder().setInputCol("day_of_week_index").setOutputCol("day_of_week_encoded")

// all machine learning algorithms in Spark take as input a Vector type, which must be a set of numerical values
import org.apache.spark.ml.feature.VectorAssembler
val vectorAssembler = new VectorAssembler().setInputCols(Array("UnitPrice", "Quantity", "day_of_week_encoded")).setOutputCol("features")

// set this up into a pipeline so any future data we need to transform can go through the exact same process
import org.apache.spark.ml.Pipeline
val transformationPipelie = new Pipeline().setStages(Array(indexer, encoder, vectorAssembler))

// fit pipeline
val fittedPipeline = transformationPipelie.fit(trainDataFrame)

// do transformation
val transformedTraining = fittedPipeline.transform(trainDataFrame)

transformedTraining.cache()

// import ML model
import org.apache.spark.ml.clustering.KMeans
val kmeans = new KMeans().setK(20).setSeed(1L)

// in spark, training ML model is a two phase process. 1. initialie an untrained model; 2. train it. Naming convention, Algorithm for untrained version, and AlgorithmModel for trained version. 
val kmModel = kmeans.fit(transformedTraining)

kmModel.computeCost(transformedTraining)
// res17: Double = 9.889216283558984E7

val transformedTest = fittedPipeline.transform(testDataFrame)
kmModel.computeCost(transformedTest)
// res19: Double = 8.448934990353165E8

相关标签: spark ml

上一篇:

下一篇: