欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

自定义SparkSql语法的一般步骤

程序员文章站 2022-04-13 14:09:38
...

        SparkSql提供了对Hive的结构化查询语言,在某些业务场景下,我们可能需要对sql语法进行扩展,在此以自定义merge语法说明其一般步骤。

        Hive中parquet格式表的数据文件可能会包含大量碎片文件(每次执行insert时都会产生独立的parquet文件),碎文件过多会影响hdfs读写效率,对表中的文件合并的一般步骤是通过对rdd做repartition操作,再重新写入,通过控制repartition的数量即可控制最后生成文件的数量。

       一 环境准备:

           IntelliJ IDEA开发环境、antlr4安装包、aspectjweaver包

           antlr4是一种开源语法解析器,sparksql内部即是通过antlr4生成的语法解析,下载antlr-4.5-complete.jar,修改.bash_profile文件,添加如下命令:

     alias antlr4 = "java -Xms500m -cp /user/local/antlr-4.5-complete.jar org.antlr.v4.Tool"
          aspectjweaver是用于做aop切面的jar包,后续会用到


       二 修改sql语法:

           在Idea中建立相关工程,首先将sparksql源码中的sql语法文件拷贝过来,源文件位于sql/catalyst目录下,以g4结尾。

           添加MERGE关键字

      MERGE: 'MERGE';
          实现MERGE命令:

      MERGE TABLE tableIdentifier partitionSpec?
          后面的partitionSpec可以指定表的分区信息,

           修改好sql语法之后,执行以下命令,此时会生成相关的java文件(注意不要使用idea自带的antlr编译器,版本不兼容)

     antlr4 -o /Users/admin/workspace/sparkparser/src/main/java/cn/tongdun/sparkparser/antlr4
     -package cn.tongdun.sparkparser.antlr4
     -visitor -no-listener
     -lib /Users/admin/workspace/sparkparser/src/main/antlr4/cn/tongdun/sparkparser
     Users/admin/workspace/sparkparser/src/main/antlr4/cn/tongdun/sparkparser/SparkParser.g4

          自定义SparkSql语法的一般步骤

       三 实现自定义逻辑 

            首先实现visitor解析器:   

class SparkSqlParserVisitor(conf: SQLConf) extends SparkParserBaseVisitor[AnyRef]{

    override def visitSingleStatement(ctx: SparkParserParser.SingleStatementContext): LogicalPlan = withOrigin(ctx){
        visit(ctx.statement).asInstanceOf[LogicalPlan]
    }

    override def visitMergeTable(ctx: SparkParserParser.MergeTableContext): LogicalPlan = withOrigin(ctx){
        MergeTableCommand(ctx.tableIdentifier, ctx.partitionSpec)
    }
}
            上面的类是解析出sql的logicplan,visitMergeTable中即是解析merge语法的logicplan,如果相应的sql语法对应的visit方法没有被重写的话,则返回的是null

            接下来要实现的是strategy:

object MergeTableStrategy extends Strategy with Serializable{

    override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
        case s:MergeTableCommand =>
            MergeTablePlan(plan.output, s.tableIdentifier, s.partitionSpec) :: Nil
        case _ => Nil
    }
}
           MergeTablePlan实现的是物理执行计划即physicplan,内部的实现原理是读取parquet文件,转换成rdd,映射成dataframe,最后写入目标路径。
           

           实现对sql的解析入口:

object SparkParserFactory {

    var sparkContext:SparkContext = null

    def parserSql(sparkSession:SparkSession, sql:String): Dataset[_] ={
        sparkContext = sparkSession.sparkContext
        sparkSession.experimental.extraStrategies = MergeTableStrategy :: Nil
        val parser = new SparkSqlParser(sparkSession)
        val plan = parser.parse(sql)
        if(plan!=null){
            val execution = sparkSession.sessionState.executePlan(plan)
            execution.assertAnalyzed()
            val clazz = classOf[Dataset[_]]
            val constructor = clazz.getDeclaredConstructor(classOf[SparkSession], classOf[QueryExecution], classOf[Encoder[_]])
            val dataSet = constructor.newInstance(sparkSession, execution, RowEncoder.apply(execution.analyzed.schema))
            return dataSet
        }
        null
    }
}
         这个方法尝试对sql进行解析,如果解析对logicplan不为null,则转换成dataset,否则返回null。

         最后实现将自定义sql嵌入到sparksql中,这里就利用到aspect的原理,首先建立AspectSql.aj文件:

public aspect AspectSql {

    private static final Logger logger = LoggerFactory.getLogger(AspectSql.class);

    public pointcut sparkSqlMethod(String sql): execution(public Dataset org.apache.spark.sql.SparkSession.sql(java.lang.String)) && args(sql) ;

    Dataset around(String sql): sparkSqlMethod(sql){
        try{
            logger.info("parser sql:"+sql);
            SparkSession sparkSession = (SparkSession)thisJoinPoint.getThis();
            Dataset dataset = SparkParserFactory.parserSql(sparkSession, sql);
            if(dataset!=null){
                return dataset;
            }else{
                logger.info("parser failed, execute as SparkSession.sql");
                return proceed(sql);
            }
        }catch (Exception e){
            throw new RuntimeException(e);
        }
    }

    public pointcut getCreateMethod(): execution(public org.apache.spark.sql.SparkSession getOrCreate());

    SparkSession around(): getCreateMethod(){
        SparkSession sparkSession = proceed();
        String jarPath = sparkSession.sparkContext().getConf().get("spark.merge.jar.path");
        if(StringUtils.isNotEmpty(jarPath)){
            sparkSession.sparkContext().addJar(jarPath);
        }
        sparkSession.exprimental.extraStrategies = MergeTableStrategy :: Nil
        return sparkSession;
    }
}
        上面的aop定义了两个切面:第一个是在sparksession启动时倒入自定义的strategy,第二个是在执行sql时进行拦截,首先尝试用自定义的sql解析器进行解析,如果解析失败则执行sparksql原生的方法,这样的好处是不需要重复实现相关的sql业务逻辑,用户只需要实现自定义的逻辑即可。

         

         在工程的resource目录下建立META-INF目录,创建aop.xml文件:

<?xml version="1.0" encoding="UTF-8" ?>
<aspectj>
    <aspects>
        <aspect name="cn.tongdun.sparkparser.AspectSql"/>
    </aspects>
    <weaver options="-XaddSerialVersionUID"></weaver>
</aspectj>

          最后对整个工程打包。

        四 部署测试

            接下来将实现的自定义sql解析器嵌入到sparksql中,我们需要两个jar包,一个是自定义sql解析器sparkparser.jar,另一个就是aspectweaver.jar,首先将sparkparser.jar上传到hdfs上的某个位置,同时将sparkparser.jar和aspectweaver.jar放到本地的某个路径上,启动spark-shell,后面加上以下参数:

bin/spark-shell
--conf spark.driver.extraJavaOptions=-javaagent:/home/admin/aspectjweaver-1.8.10.jar
--conf spark.driver.extraClassPath=/home/admin/sparkparser-1.0.0.jar
--conf spark.merge.jar.path=hdfs://tdhdfs/user/admin/sparkparser-1.0.0.jar
            本例中 sparkparser-1.0.0.jar和aspectweave-1.8.10r.jar分别都放在本地的/home/admin目录下,同时将sparkparser-1.0.0.jar上传到hdfs的/user/admin/路径下

           启动spark-shell之后,即可以执行merge table的相关命令


       按照以上的步骤可以继续开发一些定制化的sql语法,进一步满足实际业务需求。


相关标签: SparkSql antlr