欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

csv数据文件清洗【DataFrame】

程序员文章站 2022-06-03 18:54:10
...
package march.sql

import org.apache.spark.sql.SparkSession


/**
  * Description: TODO
  *
  * @Author: 留歌36
  * @Date: 2019/3/6 8:57
  */
object AllHouseAPP {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName(this.getClass.getSimpleName).master("local[2]").getOrCreate()
    // 隐式转换
//    val path = "f:\\data\\hangzhou_house.csv"
//    val path = "f:\\data\\chengdu_house.csv"
    val path = "f:\\data\\"
    val DF = spark.read.option("header","true").option("inferSchema","true").csv(path)

    import org.apache.spark.sql.functions._

    println("~~~~~~~~~~~~~~~~~~")

    println(DF.count())

    val DF3 = DF
       .filter(size(split(col("house_info") ,"\\|")) === 6)
      .withColumn("rooms", split(col("house_info"), "\\|").getItem(1).substr(2,1))
      .withColumn("halls", split(col("house_info"), "\\|").getItem(1).substr(4,1))
      .withColumn("towards", split(col("house_info"), "\\|").getItem(3))
      .withColumn("decoration", split(col("house_info"), "\\|").getItem(4))
      .withColumn("have_elevator", split(col("house_info"), "\\|").getItem(5))
      .drop("house_info")
      .drop("url")
      .filter(DF.col("publishday").isNotNull)
      .filter(DF.col("region").isNotNull)
      .filter(DF.col("visited").isNotNull)
      .filter(DF.col("attention").isNotNull)
      .filter(DF.col("total_price").isNotNull)
      .filter(DF.col("unit_price").isNotNull)

    DF3.show(false)

    println(DF3.count())
//    DF3.coalesce(1).write.csv("f:\\data\\sample.csv")
//    DF3.write
//      .option("header", "true")
//      .csv("file:///f:/data/out.csv")
    spark.stop()
  }

}

相关标签: dataframe