欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

一条Sql的Spark之旅

程序员文章站 2022-06-10 07:55:26
背景 ​ SQL作为一门标准的、通用的、简单的DSL,在大数据分析中有着越来越重要的地位;Spark在批处理引擎领域当前也是处于绝对的地位,而Spark2.0中的SparkSQL也支持ANSI SQL 2003标准。因此SparkSQL在大数据分析中的地位不言而喻。 本文将通过分析一条SQL在Spa ......

背景

​ sql作为一门标准的、通用的、简单的dsl,在大数据分析中有着越来越重要的地位;spark在批处理引擎领域当前也是处于绝对的地位,而spark2.0中的sparksql也支持ansi-sql 2003标准。因此sparksql在大数据分析中的地位不言而喻。
本文将通过分析一条sql在spark中的解析执行过程来梳理sparksql执行的一个流程。

案例分析

代码

val spark = sparksession.builder().appname("testsql").master("local[*]").enablehivesupport().getorcreate()
val df = spark.sql("select sepal_length,class from origin_csvload.csv_iris_qx  order by  sepal_length limit 10 ")
df.show(3)

我们在数仓中新建了一张表origin_csvload.csv_iris_qx,然后通过sparksql执行了一条sql,由于整个过程由于是懒加载的,需要通过terminal方法触发,此处我们选择show方法来触发。

源码分析

词法解析、语法解析以及分析

sql方法会执行以下3个重点:

  1. sessionstate.sqlparser.parseplan(sqltext):将sql字符串通过antlr解析成逻辑计划(parsed logical plan)
  2. sparksession.sessionstate.executeplan(logicalplan):执行逻辑计划,此处为懒加载,只新建queryexecution实例,并不会触发实际动作。需要注意的是queryexecution其实是包含了sql解析执行的4个阶段计划(解析、分析、优化、执行)
  3. queryexecution.assertanalyzed():触发语法分析,得到分析计划(analyzed logical plan)
def sql(sqltext: string): dataframe = {
    //1:parsed logical plan
    dataset.ofrows(self, sessionstate.sqlparser.parseplan(sqltext))
}
  
def ofrows(sparksession: sparksession, logicalplan: logicalplan): dataframe = {
    val qe = sparksession.sessionstate.executeplan(logicalplan)//d-1
    qe.assertanalyzed()//d-2
    new dataset[row](sparksession, qe, rowencoder(qe.analyzed.schema))
}

//d-1
def executeplan(plan: logicalplan): queryexecution = new queryexecution(sparksession, plan)

//2:analyzed logical plan
lazy val analyzed: logicalplansparksession.sessionstate.analyzer.executeandcheck(logical)

解析计划和分析计划

sql解析后计划如下:

== parsed logical plan ==
'globallimit 10
+- 'locallimit 10
   +- 'sort ['sepal_length asc nulls first], true
      +- 'project ['sepal_length, 'class]
         +- 'unresolvedrelation `origin_csvload`.`csv_iris_qx`

主要是将sql一一对应地翻译成了catalyst的操作,此时数据表并没有被解析,只是简单地识别为表。而分析后的计划则包含了字段的位置、类型,表的具体类型(parquet)等信息。

== analyzed logical plan ==
sepal_length: double, class: string
globallimit 10
+- locallimit 10
   +- sort [sepal_length#0 asc nulls first], true
      +- project [sepal_length#0, class#4]
         +- subqueryalias `origin_csvload`.`csv_iris_qx`
            +- relation[sepal_length#0,sepal_width#1,petal_length#2,petal_width#3,class#4] parquet

此处有个比较有意思的点,unresolvedrelation origin_csvload.csv_iris_qx被翻译成了一个子查询别名,读取文件出来的数据注册成了一个表,这个是不必要的,后续的优化会消除这个子查询别名。

优化以及执行

以dataset的show方法为例,show的方法调用链为showstring->getrows->take->head->withaction,我们先来看看withaction方法:

def head(n: int): array[t] = withaction("head", limit(n).queryexecution)(collectfromplan)
private def withaction[u](name: string, qe: queryexecution)(action: sparkplan => u) = {
    val 
    result= sqlexecution.withnewexecutionid(sparksession, qe) {
       action(qe.executedplan)
    }
    result
}

withaction方法主要执行如下逻辑:
1. 拿到缓存的解析计划,使用遍历优化器执行解析计划,得到若干优化计划。
2. 获取第一个优化计划,遍历执行前优化获得物理执行计划,这是已经可以执行的计划了。
3. 执行物理计划,返回实际结果。至此,这条sql之旅就结束了。

//3:optimized logical plan,withcacheddata为analyzed logical plan,即缓存的变量analyzed
lazy val optimizedplan: logicalplan = sparksession.sessionstate.optimizer.execute(withcacheddata)
lazy val sparkplan: sparkplan = planner.plan(returnanswer(optimizedplan)).next()
//4:physical plan
lazy val executedplan: sparkplan = prepareforexecution(sparkplan)

优化计划及物理计划

优化后的计划如下,可以看到subqueryaliases已经没有了。

== optimized logical plan ==
globallimit 10
+- locallimit 10
   +- sort [sepal_length#0 asc nulls first], true
      +- project [sepal_length#0, class#4]
         +- relation[sepal_length#0,sepal_width#1,petal_length#2,petal_width#3,class#4] parquet

具体的优化点如下图所示,行首有!表示优化的地方。
一条Sql的Spark之旅

其中"=== result of batch finish analysis ==="表示"finish analysis"的规则簇(参见附录一)被应用成功,可以看到该规则簇中有一个消除子查询别名的规则eliminatesubqueryaliases

batch("finish analysis", once,
      eliminatesubqueryaliases,
      replaceexpressions,
      computecurrenttime,
      getcurrentdatabase(sessioncatalog),
      rewritedistinctaggregates)

最后根据物理计划生成规则(附录二)可以得到物理计划,这就是已经可以执行的计划了。具体如下:

== physical plan ==
takeorderedandproject(limit=10, orderby=[sepal_length#0 asc nulls first], output=[sepal_length#0,class#4])
+- *(1) project [sepal_length#0, class#4]
   +- *(1) filescan parquet origin_csvload.csv_iris_qx[sepal_length#0,class#4] batched: true, format: parquet, location: catalogfileindex[hdfs://di124:8020/user/hive/warehouse/origin_csvload.db/csv_iris_qx], partitioncount: 1, partitionfilters: [], pushedfilters: [], readschema: struct<sepal_length:double,class:string>

总结

本文简述了一条sql是如何从字符串经过词法解析、语法解析、规则优化等步骤转化成可执行的物理计划,最后以一个terminal方法触发逻辑返回结果。本文可为后续sql优化提供一定思路,之后可再详述具体的sql优化原则。

附录一:优化方法

分析计划会依次应用如下优化:

  1. 前置优化。当前为空。
  2. 默认优化。主要有如下类别,每个类别分别有若干优化规则。
  • optimize metadata only query
  • extract python udfs
  • prune file source table partitions
  • parquet schema pruning
  • finish analysis
  • union
  • subquery
  • replace operators
  • aggregate
  • operator optimizations
  • check cartesian products
  • decimal optimizations
  • typed filter optimization
  • localrelation
  • optimizecodegen
  • rewritesubquery
  1. 后置优化。当前为空。
  2. 用户提供的优化。来自experimentalmethods.extraoptimizations,当前也没有。

附录二:物理计划生成规则

生成物理执行计划的规则如下:

  • plansubqueries
  • ensurerequirements
  • collapsecodegenstages
  • reuseexchange
  • reusesubquery

本文由博客一文多发平台 openwrite 发布!