欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

SparkSQL使用说明与DataFrame创建

程序员文章站 2022-05-12 20:17:16
Spark SQL使用说明与DataFrame创建 启动spark-shell 由于spark-shell演示具有显而易见的有点,因此本文主要基于spark-shell对Spa...

Spark SQL使用说明与DataFrame创建

启动spark-shell

由于spark-shell演示具有显而易见的有点,因此本文主要基于spark-shell对Spark SQL的使用进行介绍

我使用的是最新版本的Spark 2.3.0来进行演示

首先我们启动spark-shell,等待进入Scala REPL:

SparkSQL使用说明与DataFrame创建

创建DataFrame

Spark SQL的所有操作都是基于其内部定义的一个叫做DataFrame(Spark2.0后它变成了DataSet[Row]的类型别名)的结构的,因此,我们首先需要创建DataFrame。

创建DataFrame的方式有很多种,比如json\csv\parquet等,在Spark 2.0以后,可以通过sparkSession.read得到DataFrameReader来读取各种支持类型的文件,从而得到对应的DataFrame。

不过我们既然使用了spark-shell,就更随意一点了。如果你现在就是为了处理一份数据,那么你可以直接用sparkSession.read里的方法来加载这份数据文件。如果你只是为了学习,或者研究某种数据结构的处理方式,那么你可以使用case class的方式来构造你的实验DataFrame:

case class DemoClass(arg0: Int, arg: String)

SparkSQL使用说明与DataFrame创建

首先我们定义一个case class作为我们DataFrame的模版

之后我们造一些我们的测试数据:

scala> val test = (1 to 100).map(i => DemoClass(i, s"String_$i"))
test: scala.collection.immutable.IndexedSeq[DemoClass] = Vector(DemoClass(1,String_1), DemoClass(2,String_2), DemoClass(3,String_3), DemoClass(4,String_4), DemoClass(5,String_5), DemoClass(6,String_6), DemoClass(7,String_7), DemoClass(8,String_8), DemoClass(9,String_9), DemoClass(10,String_10), DemoClass(11,String_11), DemoClass(12,String_12), DemoClass(13,String_13), DemoClass(14,String_14), DemoClass(15,String_15), DemoClass(16,String_16), DemoClass(17,String_17), DemoClass(18,String_18), DemoClass(19,String_19), DemoClass(20,String_20), DemoClass(21,String_21), DemoClass(22,String_22), DemoClass(23,String_23), DemoClass(24,String_24), DemoClass(25,String_25), DemoClass(26,String_26), DemoClass(27,String_27), DemoClass(28,String_28), DemoClass(29,String_29), DemoClass(30,String_30), D...

这样,我们就获得了一个长度为100的DemoClass列表

之后我们运行:

scala> val df = spark.createDataFrame(test)
df: org.apache.spark.sql.DataFrame = [arg0: int, arg: string]

这样我们就轻松获得了一个我们自定义类型的DataFrame:

SparkSQL使用说明与DataFrame创建

我们分别使用printSchema和show来获取关于这个DataFrame的内部信息。

通过printSchema我们可以将DataFrame的数据结构打印出来,我们可以获得的是列名和其对应的列类型以及是否可以为null。从pringSchema的打印形式上我们可以看出,它是按树的形式来打印的,所以DataFrame的数据结构可以有很多层级的。

通过show默认打印了前20行数据,我们可以看出这个DataFrame有两列,并且其前20行数据。

从上图我们看到,使用Seq[case class]来创建DataFrame,我们得到的列名和列类型就是case class的各个成员变量的名字和类型。这样,我们就可以造出多种多样的数据结构来进行测试了。

/**********************************************我是分割线**********************************************/

下面我们来举个稍微复杂一点的例子:

scala> case class A(a: Int, b: Double, c: String)
defined class A

scala> case class B(a: A, b: List[Double], c: Map[Int, String])
defined class B

scala> case class C(a: A, b: B, c: Array[String], d: Map[A, B])
defined class C

scala> def a_gen(i: Int) = A(i, i * 0.1, s"a_$i")
a_gen: (i: Int)A

scala> def b_gen(i: Int) = B(a_gen(i), (0 to i).map(_ / 0.1).toList, (0 to i).map(j => (j, s"value_$j")).toMap)
b_gen: (i: Int)B

scala> def c_gen(i: Int) = C(a_gen(i), b_gen(i), (0 to i).map(j => s"str_$j").toArray, (0 to i).map(j => (a_gen(j), b_gen(j))).toMap)
c_gen: (i: Int)C

scala> (1 to 100).map(c_gen)
res4: scala.collection.immutable.IndexedSeq[C] = Vector(C(A(1,0.1,a_1),B(A(1,0.1,a_1),List(0.0, 10.0),Map(0 -> value_0, 1 -> value_1)),[Ljava.lang.String;@17483bb2,Map(A(0,0.0,a_0) -> B(A(0,0.0,a_0),List(0.0),Map(0 -> value_0)), A(1,0.1,a_1) -> B(A(1,0.1,a_1),List(0.0, 10.0),Map(0 -> value_0, 1 -> value_1)))), C(A(2,0.2,a_2),B(A(2,0.2,a_2),List(0.0, 10.0, 20.0),Map(0 -> value_0, 1 -> value_1, 2 -> value_2)),[Ljava.lang.String;@7f415d8e,Map(A(0,0.0,a_0) -> B(A(0,0.0,a_0),List(0.0),Map(0 -> value_0)), A(1,0.1,a_1) -> B(A(1,0.1,a_1),List(0.0, 10.0),Map(0 -> value_0, 1 -> value_1)), A(2,0.2,a_2) -> B(A(2,0.2,a_2),List(0.0, 10.0, 20.0),Map(0 -> value_0, 1 -> value_1, 2 -> value_2)))), C(A(3,0.30000000000000004,a_3),B(A(3,0.30000000000000004,a_3),List(0.0, 10.0, 20.0, 30.0),Map(0 -> value_0, ...

SparkSQL使用说明与DataFrame创建

我们从schema来进行分析

我们使用C类列表来创建DataFrame,C类共有四个成员变量a\b\c\d,因此root下有a\b\c\d四列

a为类A,我们从schema中可以看出a字段下面有三个子字段,分别对应类A的三个成员变量。剩下的b\c\d列都是同理。

我们可以看出

1、List和Array转换成了array子树结构

2、Map转换成了key\value子树结构

3、类转换成了struct子树结构