Spark-SQL学习笔记之Datasets and DataFrames
程序员文章站
2022-03-09 21:32:08
概述
spark sql是spark中的一个模块,负责结构化数据的处理。它跟spark rdd api不一样,spark sql提供的接口会提供更多关于数据和执行计算的信息。在内部,spark sq...
概述
spark sql是spark中的一个模块,负责结构化数据的处理。它跟spark rdd api不一样,spark sql提供的接口会提供更多关于数据和执行计算的信息。在内部,spark sql使用这些额外的信息去执行额外的优化。可以通过sql 和 dataset api与spark sql进行交互。当使用同一个执行引擎得到的计算结果,它是不会依赖于使用的api/语言。这意味着开发人员能更容易的在不同api进行切换。
sql
spark sql可以执行sql的查询,也可以从现有的hive中读取数据。当从另一种编程语言中运行sql时,结果将作为一个dataset/dataframe返回。可以通过命令行或者jdbc/odbc与sql进行交互。
其实spark sql的功能不仅仅只是sql的功能,它比sql拥有更多的功能。
datasets and dataframes
一个dataset是一个分部的数据集。dataset是spark 1.6中新增的一个新接口它有利于rdds和spark sql中的优化引擎。一个dataset可以jvm的对象中构建还可以被transformations函数(map,flatmap,filter,etc)操作。dataset的api可以用于scala和java。不适用于python和r语言。
一个dataframe是一个被加入列名的dataset。从概念上理解可以等同于关系型里的一张表。dataframe的构建数据源有很多,例如:结构化文件,hive中的表,外部数据库,或者已存在的rdds。dataframe api 可以用于scala,java,python 和 r语言。scala api中,dataframe进进是dataset[row]的别名。java api中为dataset
创建一个dataframes
package com.hihi.learn.sparksql import com.hihi.learn.sparksql.datasetsdemo.person import org.apache.spark.sql.types.{stringtype, structfield, structtype} import org.apache.spark.sql.{dataframe, row, sparksession} import scala.collection.mutable.arraybuffer object dataframedemo { case class person(name: string, age: long) def main(args: array[string]): unit = { // 创建sparksession,此为spark sql的入口 val spark = sparksession .builder() .appname("spark sql basic example") .master("local[2]") .getorcreate() import spark.implicits._ //creating dataframes val df = spark.read.format("json").load("e:\\spark-branch-2.2\\examples\\src\\main\\resources\\people.json") //df.show() /* +----+-------+ | age| name| +----+-------+ |null|michael| | 30| andy| | 19| justin| +----+-------+ */ // 使用算子操作dataframe //untypeddatasetoperations(df) // 使用sql编程方式 runningsqlqueriesprogrammatically(spark, df) // interoperating with rdds val peopledf = spark.sparkcontext.textfile("e:\\spark-branch-2.2\\examples\\src\\main\\resources\\people.txt") .mappartitions(its => { var arrperson = arraybuffer[person]() for (it <- its) { val arr = it.split(",") arrperson += person(arr(0), arr(1).trim.tolong) } arrperson.toiterator }) .todf().show // programmatically specifying the schema val personds4 = spark.sparkcontext.textfile("e:\\spark-branch-2.2\\examples\\src\\main\\resources\\people.txt") val schemastring = "name age" val fields = schemastring.split(" ").map(structfield(_, stringtype, nullable = true)) val schema = structtype(fields) val rowrdd = personds4 .map(_.split(",")) .map(attributes => row(attributes(0), attributes(1).trim)) val peopledf2 = spark.createdataframe(rowrdd, schema) peopledf2.show() spark.stop() } def untypeddatasetoperations(df:dataframe): unit = { // 打印schema df.printschema() // root // |-- age: long (nullable = true) // |-- name: string (nullable = true) // 查询 name df.select("name").show // +-------+ // | name| // +-------+ // |michael| // | andy| // | justin| // +-------+ // 查询age > 21的数据 df.filter("age > 21").show // +---+----+ // |age|name| // +---+----+ // | 30|andy| // +---+----+ // 使用分组 df.groupby("age").count.show() // +----+-----+ // | age|count| // +----+-----+ // | 19| 1| // |null| 1| // | 30| 1| // +----+-----+ } def runningsqlqueriesprogrammatically(spark: sparksession, df: dataframe) : unit = { // 注册一张临时表 df.createorreplacetempview("people") spark.sql("select name, age from people").show() // +----+-------+ // | age| name| // +----+-------+ // |null|michael| // | 30| andy| // | 19| justin| // +----+-------+ spark.sql("select name, age from people where name = 'andy'").show // +----+---+ // |name|age| // +----+---+ // |andy| 30| // +----+---+ } }
创建一个datasets
package com.hihi.learn.sparksql import org.apache.spark.sql.{row, sparksession} import org.apache.spark.sql.types.{stringtype, structfield, structtype} import scala.collection.mutable.arraybuffer object datasetsdemo { case class person(name: string, age: long) def main(args: array[string]): unit = { val spark = sparksession .builder() .appname("datasetsdemo") .master("local") .getorcreate() // creating datasets import spark.implicits._ val personds = seq(person("hihi", 22), person("tom", 11)).tods personds.show val personds2 = spark.read.format("json").load("e:\\spark-branch-2.2\\examples\\src\\main\\resources\\people.json").as[person] personds2.show spark.stop() } }