spark - DataFrame Api 练习 博客分类: spark DataFrame Api 练习
程序员文章站
2024-03-12 18:48:14
...
DataFrame Api 练习(需要了解下functions.scala源码)
package df import org.apache.spark.sql.SparkSession object DataFrameDemo { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().appName("DF_DEMO").master("local").getOrCreate() /** * 测试数据内容: * {"id":1,"name":"zhangsan"} * {"id":3,"name":"lisi"} * {"id":2,"name":"wangwu"} */ val df = spark.read.json("file:///F:\\test\\1.json") //1.展示df的内容, show方法默认展示20行, 每列限制20个字符,超出就截断 df.show() //2.打印schema df.printSchema() //3.查询id列 (如果id列不是schema里的列,会报错找不到列!!!) df.select("id").show() df.select(df("id")).show() //需要引入隐式转换 import spark.implicits._ df.select('id).show() df.select($"id").show() //4.group by (借助count转为DataFrame) df.groupBy("id").count().show() //5.使用sql的写法 //createGlobalTempView会把表people_tbl建在global_temp数据库下 df.createGlobalTempView("people_tbl") //查询时候要指定数据库global_temp, 否则会报错找不到表!!! // spark.sql("select * from people_tbl").show() spark.sql("select * from global_temp.people_tbl").show() //global temp view在一个application的多个session中都可见!!! spark.newSession().sql("SELECT * FROM global_temp.people_tbl").show() //6.head从头取n个数据的Array df.head(2).foreach(println) //7.filter操作!!! /** * 输出结果: * +---+--------+ * | id| name| * +---+--------+ * | 1|zhangsan| * | 3| lisi| * +---+--------+ */ df.filter("name='lisi' or id=1").show() //name以w开头的 df.filter("substr(name,0, 1)='w'").show() //8.sort默认是字典升序 df.sort("name").show() //字典降序col.desc, 按照多列排序 df.sort(df.col("name").desc, df.col("id").asc).show() //9.as别名 /** * 输出结果: * +------+ * |stu_id| * +------+ * | 1| * | 3| * | 2| * +------+ */ df.select(df.col("id").as("stu_id")).show() //10.join操作(join 时候on的条件必须要使用 === 连接 !!!),join()的第三个参数指定连接方式 /** * 输出结果 * +---+--------+---+--------+ * | id| name| id| name| * +---+--------+---+--------+ * | 1|zhangsan| 1|zhangsan| * | 3| lisi| 3| lisi| * | 2| wangwu| 2| wangwu| * +---+--------+---+--------+ */ df.join(df, df.col("id") === df.col("id"), "inner").show() //11.orderBy 底层调用的是sort() df.orderBy().show() df.sort().show() //12.limit和head的区别 /** * head :是一个action, 返回数组 * limit :返回一个 Dataset. **/ df.head(2).foreach(println) df.limit(2).show() } }