欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Spark-SQL学习笔记之Datasets and DataFrames

程序员文章站 2022-03-09 21:32:08
概述 spark sql是spark中的一个模块,负责结构化数据的处理。它跟spark rdd api不一样,spark sql提供的接口会提供更多关于数据和执行计算的信息。在内部,spark sq...

概述

spark sql是spark中的一个模块,负责结构化数据的处理。它跟spark rdd api不一样,spark sql提供的接口会提供更多关于数据和执行计算的信息。在内部,spark sql使用这些额外的信息去执行额外的优化。可以通过sql 和 dataset api与spark sql进行交互。当使用同一个执行引擎得到的计算结果,它是不会依赖于使用的api/语言。这意味着开发人员能更容易的在不同api进行切换。

sql

spark sql可以执行sql的查询,也可以从现有的hive中读取数据。当从另一种编程语言中运行sql时,结果将作为一个dataset/dataframe返回。可以通过命令行或者jdbc/odbc与sql进行交互。

其实spark sql的功能不仅仅只是sql的功能,它比sql拥有更多的功能。

datasets and dataframes

一个dataset是一个分部的数据集。dataset是spark 1.6中新增的一个新接口它有利于rdds和spark sql中的优化引擎。一个dataset可以jvm的对象中构建还可以被transformations函数(map,flatmap,filter,etc)操作。dataset的api可以用于scala和java。不适用于python和r语言。

一个dataframe是一个被加入列名的dataset。从概念上理解可以等同于关系型里的一张表。dataframe的构建数据源有很多,例如:结构化文件,hive中的表,外部数据库,或者已存在的rdds。dataframe api 可以用于scala,java,python 和 r语言。scala api中,dataframe进进是dataset[row]的别名。java api中为dataset

创建一个dataframes

package com.hihi.learn.sparksql

import com.hihi.learn.sparksql.datasetsdemo.person
import org.apache.spark.sql.types.{stringtype, structfield, structtype}
import org.apache.spark.sql.{dataframe, row, sparksession}

import scala.collection.mutable.arraybuffer

object dataframedemo {
  case class person(name: string, age: long)
  def main(args: array[string]): unit = {
    // 创建sparksession,此为spark sql的入口
    val spark = sparksession
      .builder()
      .appname("spark sql basic example")
      .master("local[2]")
      .getorcreate()
    import spark.implicits._
    //creating dataframes
    val df = spark.read.format("json").load("e:\\spark-branch-2.2\\examples\\src\\main\\resources\\people.json")
    //df.show()
    /*
    +----+-------+
    | age|   name|
    +----+-------+
    |null|michael|
    |  30|   andy|
    |  19| justin|
    +----+-------+
    */

    // 使用算子操作dataframe
    //untypeddatasetoperations(df)

    // 使用sql编程方式
    runningsqlqueriesprogrammatically(spark, df)
    // interoperating with rdds
    val peopledf = spark.sparkcontext.textfile("e:\\spark-branch-2.2\\examples\\src\\main\\resources\\people.txt")
      .mappartitions(its => {
        var arrperson = arraybuffer[person]()
        for (it <- its) {
          val arr = it.split(",")
          arrperson += person(arr(0), arr(1).trim.tolong)
        }
        arrperson.toiterator
      })
      .todf().show

    // programmatically specifying the schema
    val personds4 = spark.sparkcontext.textfile("e:\\spark-branch-2.2\\examples\\src\\main\\resources\\people.txt")
    val schemastring = "name age"
    val fields = schemastring.split(" ").map(structfield(_, stringtype, nullable = true))
    val schema = structtype(fields)

    val rowrdd = personds4
      .map(_.split(","))
      .map(attributes => row(attributes(0), attributes(1).trim))

    val peopledf2 = spark.createdataframe(rowrdd, schema)
    peopledf2.show()
    spark.stop()

  }

  def untypeddatasetoperations(df:dataframe): unit = {

    // 打印schema
    df.printschema()
    // root
    // |-- age: long (nullable = true)
    // |-- name: string (nullable = true)

    // 查询 name
    df.select("name").show
    // +-------+
    // |   name|
    // +-------+
    // |michael|
    // |   andy|
    // | justin|
    // +-------+

    // 查询age > 21的数据
    df.filter("age > 21").show
    // +---+----+
    // |age|name|
    // +---+----+
    // | 30|andy|
    // +---+----+

    // 使用分组
    df.groupby("age").count.show()
    // +----+-----+
    // | age|count|
    // +----+-----+
    // |  19|    1|
    // |null|    1|
    // |  30|    1|
    // +----+-----+
  }

  def runningsqlqueriesprogrammatically(spark: sparksession, df: dataframe) : unit = {
    // 注册一张临时表
    df.createorreplacetempview("people")

    spark.sql("select name, age from people").show()
    // +----+-------+
    // | age|   name|
    // +----+-------+
    // |null|michael|
    // |  30|   andy|
    // |  19| justin|
    // +----+-------+

    spark.sql("select name, age from people where name = 'andy'").show
    // +----+---+
    // |name|age|
    // +----+---+
    // |andy| 30|
    // +----+---+
  }

}

创建一个datasets

package com.hihi.learn.sparksql

import org.apache.spark.sql.{row, sparksession}
import org.apache.spark.sql.types.{stringtype, structfield, structtype}

import scala.collection.mutable.arraybuffer


object datasetsdemo {

  case class person(name: string, age: long)

  def main(args: array[string]): unit = {
    val spark = sparksession
      .builder()
      .appname("datasetsdemo")
      .master("local")
      .getorcreate()

    // creating datasets
    import spark.implicits._
    val personds = seq(person("hihi", 22), person("tom", 11)).tods
    personds.show

    val personds2 = spark.read.format("json").load("e:\\spark-branch-2.2\\examples\\src\\main\\resources\\people.json").as[person]
    personds2.show

    spark.stop()
  }
}