csv数据文件清洗【DataFrame】
程序员文章站
2022-06-03 18:54:10
...
package march.sql
import org.apache.spark.sql.SparkSession
/**
* Description: TODO
*
* @Author: 留歌36
* @Date: 2019/3/6 8:57
*/
object AllHouseAPP {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[2]").getOrCreate()
// 隐式转换
// val path = "f:\\data\\hangzhou_house.csv"
// val path = "f:\\data\\chengdu_house.csv"
val path = "f:\\data\\"
val DF = spark.read.option("header","true").option("inferSchema","true").csv(path)
import org.apache.spark.sql.functions._
println("~~~~~~~~~~~~~~~~~~")
println(DF.count())
val DF3 = DF
.filter(size(split(col("house_info") ,"\\|")) === 6)
.withColumn("rooms", split(col("house_info"), "\\|").getItem(1).substr(2,1))
.withColumn("halls", split(col("house_info"), "\\|").getItem(1).substr(4,1))
.withColumn("towards", split(col("house_info"), "\\|").getItem(3))
.withColumn("decoration", split(col("house_info"), "\\|").getItem(4))
.withColumn("have_elevator", split(col("house_info"), "\\|").getItem(5))
.drop("house_info")
.drop("url")
.filter(DF.col("publishday").isNotNull)
.filter(DF.col("region").isNotNull)
.filter(DF.col("visited").isNotNull)
.filter(DF.col("attention").isNotNull)
.filter(DF.col("total_price").isNotNull)
.filter(DF.col("unit_price").isNotNull)
DF3.show(false)
println(DF3.count())
// DF3.coalesce(1).write.csv("f:\\data\\sample.csv")
// DF3.write
// .option("header", "true")
// .csv("file:///f:/data/out.csv")
spark.stop()
}
}
上一篇: C#遍历子目录的方法
下一篇: gdisk分区及swap分区