欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Spark学习07——创建DataFrame的方法

程序员文章站 2022-06-03 18:55:28
...

一、简介

基于Row的DateSet就是DataFrame,即DataFrame是DateSet的一个子集,DataFrame只是DateSet的叫法

二、创建方法

1. 使用toDF函数创建DataFrame

object CreateDataFrameFun {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("Spark SQL basic example")
      .getOrCreate()
    import spark.implicits._
    val df = Seq(
      (1, "First Value", java.sql.Date.valueOf("2010-01-01")),
      (2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
    ).toDF("int_column", "string_column", "date_column")

    df.foreach(x =>println(x))
    spark.stop()
  }
  }

2. case class(bean) + toDF创建DataFrame

object CreateDataFrameFun {
  case class Person (name: String, age: Long)
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("Spark SQL basic example")
      .getOrCreate()
    import spark.implicits._

    // Create an RDD of Person objects from a text file, convert it to a Dataframe
    val peopleDF = spark.sparkContext
      .textFile("examples/src/main/resources/people.txt")
      .map(_.split(","))
      .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
      .toDF()
    peopleDF.foreach(x =>println(x))
    spark.stop()
  }
  }

3. 使用createDataFrame函数创建DataFrame

object CreateDataFrameFun {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("Spark SQL basic example")
      .getOrCreate()
    val sc = sparkSession.sparkContext

    import org.apache.spark.sql.types._
    val schema = StructType(List(
      StructField("integer_column", IntegerType, nullable = false),
      StructField("string_column", StringType, nullable = true),
      StructField("date_column", DateType, nullable = true)
    ))

    val rdd = sc.parallelize(Seq(
      Row(1, "First Value", java.sql.Date.valueOf("2010-01-01")),
      Row(2, "Second Value", java.sql.Date.valueOf("2010-02-01"))
    ))
    val df = sparkSession.sqlContext.createDataFrame(rdd, schema)
    df.foreach(x =>println(x))
    spark.stop()
  }
  }
相关标签: DataFrame