欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spark - DataFrame Api 练习 博客分类: spark DataFrame Api 练习 

程序员文章站 2024-03-12 18:48:14
...

DataFrame Api 练习(需要了解下functions.scala源码)

 

package df

import org.apache.spark.sql.SparkSession

object DataFrameDemo {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("DF_DEMO").master("local").getOrCreate()

    /**
      * 测试数据内容:
      * {"id":1,"name":"zhangsan"}
      * {"id":3,"name":"lisi"}
      * {"id":2,"name":"wangwu"}
      */

    val df = spark.read.json("file:///F:\\test\\1.json")
    //1.展示df的内容, show方法默认展示20行, 每列限制20个字符,超出就截断
    df.show()

    //2.打印schema
    df.printSchema()

    //3.查询id列 (如果id列不是schema里的列,会报错找不到列!!!)
    df.select("id").show()
    df.select(df("id")).show()

    //需要引入隐式转换
    import spark.implicits._
    df.select('id).show()
    df.select($"id").show()

    //4.group by (借助count转为DataFrame)
    df.groupBy("id").count().show()

    //5.使用sql的写法
    //createGlobalTempView会把表people_tbl建在global_temp数据库下
    df.createGlobalTempView("people_tbl")

    //查询时候要指定数据库global_temp, 否则会报错找不到表!!!
    //    spark.sql("select * from people_tbl").show()
    spark.sql("select * from global_temp.people_tbl").show()

    //global temp view在一个application的多个session中都可见!!!
    spark.newSession().sql("SELECT * FROM global_temp.people_tbl").show()

    //6.head从头取n个数据的Array
    df.head(2).foreach(println)

    //7.filter操作!!!
    /**
      * 输出结果:
      * +---+--------+
      * | id|    name|
      * +---+--------+
      * |  1|zhangsan|
      * |  3|    lisi|
      * +---+--------+
      */
    df.filter("name='lisi' or id=1").show()
    //name以w开头的
    df.filter("substr(name,0, 1)='w'").show()

    //8.sort默认是字典升序
    df.sort("name").show()
    //字典降序col.desc, 按照多列排序
    df.sort(df.col("name").desc, df.col("id").asc).show()

    //9.as别名
    /**
      * 输出结果:
      * +------+
      * |stu_id|
      * +------+
      * |     1|
      * |     3|
      * |     2|
      * +------+
      */
    df.select(df.col("id").as("stu_id")).show()

    //10.join操作(join 时候on的条件必须要使用 ===  连接 !!!),join()的第三个参数指定连接方式
    /**
      * 输出结果
      * +---+--------+---+--------+
      * | id|    name| id|    name|
      * +---+--------+---+--------+
      * |  1|zhangsan|  1|zhangsan|
      * |  3|    lisi|  3|    lisi|
      * |  2|  wangwu|  2|  wangwu|
      * +---+--------+---+--------+
      */
    df.join(df, df.col("id") === df.col("id"), "inner").show()

    //11.orderBy 底层调用的是sort()
    df.orderBy().show()
    df.sort().show()

    //12.limit和head的区别
    /**
      * head  :是一个action, 返回数组
      * limit :返回一个 Dataset.
      **/
    df.head(2).foreach(println)
    df.limit(2).show()
  }
}