自定义SparkSql语法的一般步骤
SparkSql提供了对Hive的结构化查询语言,在某些业务场景下,我们可能需要对sql语法进行扩展,在此以自定义merge语法说明其一般步骤。
Hive中parquet格式表的数据文件可能会包含大量碎片文件(每次执行insert时都会产生独立的parquet文件),碎文件过多会影响hdfs读写效率,对表中的文件合并的一般步骤是通过对rdd做repartition操作,再重新写入,通过控制repartition的数量即可控制最后生成文件的数量。
一 环境准备:
IntelliJ IDEA开发环境、antlr4安装包、aspectjweaver包
antlr4是一种开源语法解析器,sparksql内部即是通过antlr4生成的语法解析,下载antlr-4.5-complete.jar,修改.bash_profile文件,添加如下命令:
alias antlr4 = "java -Xms500m -cp /user/local/antlr-4.5-complete.jar org.antlr.v4.Tool"
aspectjweaver是用于做aop切面的jar包,后续会用到
二 修改sql语法:
在Idea中建立相关工程,首先将sparksql源码中的sql语法文件拷贝过来,源文件位于sql/catalyst目录下,以g4结尾。
添加MERGE关键字
MERGE: 'MERGE';
实现MERGE命令:
MERGE TABLE tableIdentifier partitionSpec?
后面的partitionSpec可以指定表的分区信息,
修改好sql语法之后,执行以下命令,此时会生成相关的java文件(注意不要使用idea自带的antlr编译器,版本不兼容)
antlr4 -o /Users/admin/workspace/sparkparser/src/main/java/cn/tongdun/sparkparser/antlr4
-package cn.tongdun.sparkparser.antlr4
-visitor -no-listener
-lib /Users/admin/workspace/sparkparser/src/main/antlr4/cn/tongdun/sparkparser
Users/admin/workspace/sparkparser/src/main/antlr4/cn/tongdun/sparkparser/SparkParser.g4
三 实现自定义逻辑
首先实现visitor解析器:
class SparkSqlParserVisitor(conf: SQLConf) extends SparkParserBaseVisitor[AnyRef]{
override def visitSingleStatement(ctx: SparkParserParser.SingleStatementContext): LogicalPlan = withOrigin(ctx){
visit(ctx.statement).asInstanceOf[LogicalPlan]
}
override def visitMergeTable(ctx: SparkParserParser.MergeTableContext): LogicalPlan = withOrigin(ctx){
MergeTableCommand(ctx.tableIdentifier, ctx.partitionSpec)
}
}
上面的类是解析出sql的logicplan,visitMergeTable中即是解析merge语法的logicplan,如果相应的sql语法对应的visit方法没有被重写的话,则返回的是null
接下来要实现的是strategy:
object MergeTableStrategy extends Strategy with Serializable{
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case s:MergeTableCommand =>
MergeTablePlan(plan.output, s.tableIdentifier, s.partitionSpec) :: Nil
case _ => Nil
}
}
MergeTablePlan实现的是物理执行计划即physicplan,内部的实现原理是读取parquet文件,转换成rdd,映射成dataframe,最后写入目标路径。实现对sql的解析入口:
object SparkParserFactory {
var sparkContext:SparkContext = null
def parserSql(sparkSession:SparkSession, sql:String): Dataset[_] ={
sparkContext = sparkSession.sparkContext
sparkSession.experimental.extraStrategies = MergeTableStrategy :: Nil
val parser = new SparkSqlParser(sparkSession)
val plan = parser.parse(sql)
if(plan!=null){
val execution = sparkSession.sessionState.executePlan(plan)
execution.assertAnalyzed()
val clazz = classOf[Dataset[_]]
val constructor = clazz.getDeclaredConstructor(classOf[SparkSession], classOf[QueryExecution], classOf[Encoder[_]])
val dataSet = constructor.newInstance(sparkSession, execution, RowEncoder.apply(execution.analyzed.schema))
return dataSet
}
null
}
}
这个方法尝试对sql进行解析,如果解析对logicplan不为null,则转换成dataset,否则返回null。
最后实现将自定义sql嵌入到sparksql中,这里就利用到aspect的原理,首先建立AspectSql.aj文件:
public aspect AspectSql {
private static final Logger logger = LoggerFactory.getLogger(AspectSql.class);
public pointcut sparkSqlMethod(String sql): execution(public Dataset org.apache.spark.sql.SparkSession.sql(java.lang.String)) && args(sql) ;
Dataset around(String sql): sparkSqlMethod(sql){
try{
logger.info("parser sql:"+sql);
SparkSession sparkSession = (SparkSession)thisJoinPoint.getThis();
Dataset dataset = SparkParserFactory.parserSql(sparkSession, sql);
if(dataset!=null){
return dataset;
}else{
logger.info("parser failed, execute as SparkSession.sql");
return proceed(sql);
}
}catch (Exception e){
throw new RuntimeException(e);
}
}
public pointcut getCreateMethod(): execution(public org.apache.spark.sql.SparkSession getOrCreate());
SparkSession around(): getCreateMethod(){
SparkSession sparkSession = proceed();
String jarPath = sparkSession.sparkContext().getConf().get("spark.merge.jar.path");
if(StringUtils.isNotEmpty(jarPath)){
sparkSession.sparkContext().addJar(jarPath);
}
sparkSession.exprimental.extraStrategies = MergeTableStrategy :: Nil
return sparkSession;
}
}
上面的aop定义了两个切面:第一个是在sparksession启动时倒入自定义的strategy,第二个是在执行sql时进行拦截,首先尝试用自定义的sql解析器进行解析,如果解析失败则执行sparksql原生的方法,这样的好处是不需要重复实现相关的sql业务逻辑,用户只需要实现自定义的逻辑即可。
在工程的resource目录下建立META-INF目录,创建aop.xml文件:
<?xml version="1.0" encoding="UTF-8" ?>
<aspectj>
<aspects>
<aspect name="cn.tongdun.sparkparser.AspectSql"/>
</aspects>
<weaver options="-XaddSerialVersionUID"></weaver>
</aspectj>
最后对整个工程打包。
四 部署测试
接下来将实现的自定义sql解析器嵌入到sparksql中,我们需要两个jar包,一个是自定义sql解析器sparkparser.jar,另一个就是aspectweaver.jar,首先将sparkparser.jar上传到hdfs上的某个位置,同时将sparkparser.jar和aspectweaver.jar放到本地的某个路径上,启动spark-shell,后面加上以下参数:
bin/spark-shell
--conf spark.driver.extraJavaOptions=-javaagent:/home/admin/aspectjweaver-1.8.10.jar
--conf spark.driver.extraClassPath=/home/admin/sparkparser-1.0.0.jar
--conf spark.merge.jar.path=hdfs://tdhdfs/user/admin/sparkparser-1.0.0.jar
本例中 sparkparser-1.0.0.jar和aspectweave-1.8.10r.jar分别都放在本地的/home/admin目录下,同时将sparkparser-1.0.0.jar上传到hdfs的/user/admin/路径下
启动spark-shell之后,即可以执行merge table的相关命令
按照以上的步骤可以继续开发一些定制化的sql语法,进一步满足实际业务需求。
上一篇: ANTLR4(七) 加载CSV数据
下一篇: 利用Antlr开发状态机