美文网首页邵红晓
antlr4 + spark sql对业务sql进行解析

antlr4 + spark sql对业务sql进行解析

作者: 咋了生活 | 来源:发表于2018-11-07 10:18 被阅读0次

通过Spark Sql实现SQL解析

在大数据平台开发过程中,会遇到血缘分析,对SQL解析并进行权限的鉴权,需要提前对SQL进行基本语法校验,这些场景都需要对SQL进行解析。

常用的sql解析工具
  1. 阿里 Druid:支持的数据库类型不少,但是解析时需要制定数据库类型,并且在使用中,对hive的语法解析版本比较老,兼容性不太好
    2.Hive原生sql解析:由于在大数据平台进行业务开发时,开发人员写的SQL并一定是完全符合hive规范的,因为在运行时是先通过spark进行解析的,所以也并不能完全满足需要
    3.General SQL Parser(未测试)这款工具在解析的时候也是需要指定数据库

最后通过调研之后,决定还是采用spark原生的sql解析,下面对采用spark sql原生解析进行介绍,并且最终可以做到不依赖spark的任何jar包。

antlr4包依赖

由于spark sql的解析是通过antlr4实现的,所以首先需要添加依赖

<dependency>
<groupId>org.antlr</groupId>
<artifactId>antlr4-runtime</artifactId>
<version>4.5.3</version>
</dependency>

拷贝spark sql对应的sql解析代码

在spark源码中找到以下代码,并将这部分代码拷贝到自己的工程中,包名请修改成自己项目包名。


image.png
拷贝完之后,在对应的包下新建两个类
  1. ANTLRNoCaseStringStream
import org.antlr.v4.runtime.ANTLRInputStream;
import org.antlr.v4.runtime.IntStream;

public class ANTLRNoCaseStringStream extends ANTLRInputStream {
   public ANTLRNoCaseStringStream(String input){
        super(input);
   }

    @Override
    public int LA(int i){
        int la = super.LA(i);
        if (la == 0 || la == IntStream.EOF){
            return la;
        } else {
            return Character.toUpperCase(la);
        }
    }
}
  1. MySqlBaseBaseListener 在解析过程中获取需要的数据,此类实现的是获得解析过程中select的表和insert的表,如果需要获取列名等其他的,请参考SqlBaseBaseListener 类中的方法,来获得自己需要的数据。
import org.antlr.v4.runtime.tree.ParseTreeWalker;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public class MySqlBaseBaseListener extends SqlBaseBaseListener {
    private Map<String, Set<String>> dataBaseTablenameAndOper = new HashMap<>();//用来保存表与操作的对应关系

    public Map<String, Set<String>> getDataBaseTablenameAndOper() {
        return dataBaseTablenameAndOper;
    }

    public void enterQuerySpecification(SqlBaseParser.QuerySpecificationContext ctx) {
        final SqlBaseParser.QuerySpecificationContext baseCtx = ctx;
        ParseTreeWalker queryWalker = new ParseTreeWalker();
        queryWalker.walk(new SqlBaseBaseListener() {
            public void enterTableIdentifier(SqlBaseParser.TableIdentifierContext ctx) {
                if(ctx.table!=null) {
                    String table = ctx.getText().toLowerCase();
                    Set<String> oper;
                    if (dataBaseTablenameAndOper.containsKey(table)) {
                        oper = dataBaseTablenameAndOper.get(table);
                    } else {
                        oper = new HashSet<>();
                    }
                    oper.add("SELECT");
                    dataBaseTablenameAndOper.put(table, oper);
                }
            }
        }, ctx);
    }

    public void enterInsertInto(SqlBaseParser.InsertIntoContext ctx){
        final SqlBaseParser.InsertIntoContext baseCtx = ctx;
        ParseTreeWalker queryWalker = new ParseTreeWalker();
        final Set<String> simpleTables = new HashSet<String>();
        queryWalker.walk(new SqlBaseBaseListener() {
            public void enterTableIdentifier(SqlBaseParser.TableIdentifierContext ctx) {
                if(ctx.table!=null) {
                    String table = ctx.getText().toLowerCase();
                    Set<String> oper;
                    if (dataBaseTablenameAndOper.containsKey(table)) {
                        oper = dataBaseTablenameAndOper.get(table);
                    } else {
                        oper = new HashSet<>();
                    }
                    oper.add("INSERT");
                    dataBaseTablenameAndOper.put(table, oper);
                }
            }
        }, ctx);
    }
}

  1. 编写sql解析工具类SparkSqlUtil
import org.antlr.v4.runtime.CommonTokenStream;
import org.antlr.v4.runtime.tree.ParseTreeWalker;
import com.luckincoffee.datas.spark.sql.catalyst.parser.*;

import java.util.Map;
import java.util.Set;

public class SparkSqlUtil {
    public static Map<String, Set<String>> getDataBaseTablenameAndOper(String sql){
        SqlBaseLexer lexer = new SqlBaseLexer(new ANTLRNoCaseStringStream(sql));

        CommonTokenStream tokenStream = new CommonTokenStream(lexer);
        SqlBaseParser parser = new SqlBaseParser(tokenStream);
        ParseTreeWalker walker = new ParseTreeWalker();
        MySqlBaseBaseListener mySqlBaseBaseListener = new MySqlBaseBaseListener();

        walker.walk(mySqlBaseBaseListener, parser.statement());

        return mySqlBaseBaseListener.getDataBaseTablenameAndOper();
    }
}

至此通过spark sql进行sql解析就完成了,解析的内容和返回的数据格式,都可以自定义完成。联系邮箱:czmqlgr@163.com

相关文章

网友评论

    本文标题:antlr4 + spark sql对业务sql进行解析

    本文链接:https://www.haomeiwen.com/subject/ffjmxqtx.html