欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  科技

Presto阅读源码Sql执行过程的记录

程序员文章站 2022-05-09 20:33:39
前言 Presto源码主要从两部分入手阅读,presto-cli与presto-main分别对应的是client端的入口与server端的入口工程。 版本如下 com.facebook....
前言
Presto源码主要从两部分入手阅读,presto-cli与presto-main分别对应的是client端的入口与server端的入口工程。
版本如下
com.facebook.presto
presto-root
0.190-SNAPSHOT

presto-cli的main方法入口为

com.facebook.presto.cli.Presto.java

presto-main的main方法入口为

com.facebook.presto.server.PrestoServer.java

client端与server端的启动分别从这两个main中启动,今天主要说一下client端将查询请求到server端后如何执行查询的。


StatementResource.createQuery()方法

client会发送一个http请求到 StatementResource 类的createQuery方法

    if (isNullOrEmpty(statement)) {
            throw new WebApplicationException(Response
                    .status(Status.BAD_REQUEST)
                    .type(MediaType.TEXT_PLAIN)
                    .entity("SQL statement is empty")
                    .build());
        }

        SessionContext sessionContext = new HttpRequestSessionContext(servletRequest);

        ExchangeClient exchangeClient = exchangeClientSupplier.get(deltaMemoryInBytes -> {});    
        //进入到create方法
        Query query = Query.create(
                sessionContext,
                statement,
                queryManager,
                sessionPropertyManager,
                exchangeClient,
                responseExecutor,
                timeoutExecutor,
                blockEncodingSerde);

Query.create()方法

准备构造Query对象

  Query result = new Query(sessionContext, query, queryManager, sessionPropertyManager, exchangeClient, dataProcessorExecutor, timeoutExecutor, blockEncodingSerde);

执行Query构造器

QueryInfo queryInfo = queryManager.createQuery(sessionContext, query);

SqlQueryManager.createQuery()方法

向查询管理器中提交sql

 // start the query in the background
 queueManager.submit(statement, queryExecution, queryExecutor);

SqlQueryQueueManager.submit()方法
public void submit(Statement statement, QueryExecution queryExecution, Executor executor)
{
    List queues;
    try {
     //选择一个符合角色的队列集合
        queues = selectQueues(queryExecution.getSession(), executor);
    }
    catch (PrestoException e) {
        queryExecution.fail(e);
        return;
    }
    for (QueryQueue queue : queues) {
        if (!queue.reserve(queryExecution)) {
            // 如果我们无法获得许可证进入队列,拒绝查询
            // 当这个查询失败时,许可证将被释放
            queryExecution.fail(new PrestoException(QUERY_QUEUE_FULL, "Too many queued queries"));
            return;
        }
    }
    //将构造好的QueuedExecution入队
    queues.get(0).enqueue(createQueuedExecution(queryExecution, queues.subList(1, queues.size()), executor));
}

QueryQueue.enqueue.enqueue()方法

public void enqueue(QueuedExecution queuedExecution)
{
    queryQueueSize.incrementAndGet();
     //添加一个回调,如果它已经完成的条目出列。
     //这使得我们可以在开始前取消输入,
     //如果在启动后调用,则不起作用。
    QueueEntry entry = new QueueEntry(queuedExecution, queryQueueSize::decrementAndGet);
    queuedExecution.getCompletionFuture().addListener(entry::dequeue, MoreExecutors.directExecutor());
//异步提交QueueEntry
    asyncSemaphore.submit(entry);
}

AsyncSemaphore.submit()方法

public ListenableFuture submit(T task)
{
    QueuedTask queuedTask = new QueuedTask<>(task);
    //向任务队列里填充QueueEntry
    queuedTasks.add(queuedTask);
    //获取许可
    acquirePermit();
    return queuedTask.getCompletionFuture();
}

获取许可开始任务

private final Runnable runNextTask = this::runNext;
private void acquirePermit()
{
 //判断是否有剩余的许可可以发放
    if (counter.incrementAndGet() <= maxPermits) {
        //如果不是所有的许可证都已经发放,那就开始一个任务,注意runNexTask调用的为runNext方法
        submitExecutor.execute(runNextTask);
    }
}
private void runNext()
 {
      final QueuedTask queuedTask = queuedTasks.poll();
      //提交任务
      ListenableFuture future = submitTask(queuedTask.getTask());
      Futures.addCallback(future, new FutureCallback()
      {
          @Override
          public void onSuccess(Object result)
          {
              queuedTask.markCompleted();
              releasePermit();
          }

          @Override
          public void onFailure(Throwable t)
          {
              queuedTask.markFailure(t);
              releasePermit();
          }
      });
  }
private final Function> submitter;
private ListenableFuture submitTask(T task)
{
     try {
      //提交并且回调到QueryQueue
         ListenableFuture future = submitter.apply(task);
         if (future == null) {
             return Futures.immediateFailedFuture(new NullPointerException("Submitter returned a null future for task: " + task));
         }
         return future;
     }
     catch (Exception e) {
         return Futures.immediateFailedFuture(e);
     }
 }

QueryQueue构造
该对象调用是来自SqlQueryQueueManager.selectQueues().getOrCreateQueues

QueryQueue(Executor queryExecutor, int maxQueuedQueries, int maxConcurrentQueries)
{
    requireNonNull(queryExecutor, "queryExecutor is null");
    checkArgument(maxQueuedQueries > 0, "maxQueuedQueries must be greater than zero");
    checkArgument(maxConcurrentQueries > 0, "maxConcurrentQueries must be greater than zero");

    int permits = maxQueuedQueries + maxConcurrentQueries;
    // Check for overflow
    checkArgument(permits > 0, "maxQueuedQueries + maxConcurrentQueries must be less than or equal to %s", Integer.MAX_VALUE);

    this.queuePermits = new AtomicInteger(permits);
    this.asyncSemaphore = new AsyncSemaphore<>(maxConcurrentQueries,
            queryExecutor,
            queueEntry -> {
                QueuedExecution queuedExecution = queueEntry.dequeue();
                if (queuedExecution != null) {
                 //获取queuedExecution启动他
                    queuedExecution.start();
                    return queuedExecution.getCompletionFuture();
                }
                return Futures.immediateFuture(null);
            });
}

QueuedExecution.start()方法

public void start()
{
      // Only execute if the query is not already completed (e.g. cancelled)
      if (listenableFuture.isDone()) {
          return;
      }
      if (nextQueues.isEmpty()) {
          executor.execute(() -> {
              try (SetThreadName ignored = new SetThreadName("Query-%s", queryExecution.getQueryId())) {
               //继续启动,异步的执行的是SqlQueryExecution的start方法
                  queryExecution.start();
              }
          });
      }
      else {
          nextQueues.get(0).enqueue(new QueuedExecution(queryExecution, nextQueues.subList(1, nextQueues.size()), executor, listenableFuture));
      }
  }

SqlQueryExecution.start()方法

public void start()
{
    try (SetThreadName ignored = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
        try {
            // transition to planning
            if (!stateMachine.transitionToPlanning()) {
                // query already started or finished
                return;
            }

            // 分析查询
            PlanRoot plan = analyzeQuery();

            metadata.beginQuery(getSession(), plan.getConnectors());

            // plan distribution of query
            planDistribution(plan);

            // transition to starting
            if (!stateMachine.transitionToStarting()) {
                // query already started or finished
                return;
            }

            // if query is not finished, start the scheduler, otherwise cancel it
            SqlQueryScheduler scheduler = queryScheduler.get();

            if (!stateMachine.isDone()) {
                scheduler.start();
            }
        }
        catch (Throwable e) {
            fail(e);
            throwIfInstanceOf(e, Error.class);
        }
    }
}
 private PlanRoot analyzeQuery()
 {
      try {
          //开始分析
          return doAnalyzeQuery();
      }
      catch (*Error e) {
          throw new PrestoException(NOT_SUPPORTED, "statement is too large (stack overflow during analysis)", e);
      }
  }
private PlanRoot doAnalyzeQuery()
{
    // time analysis phase
    long analysisStart = System.nanoTime();

    // analyze query
    Analyzer analyzer = new Analyzer(stateMachine.getSession(), metadata, sqlParser, accessControl, Optional.of(queryExplainer), parameters);
    //分析的关键,statment是sql解析后的对象
    Analysis analysis = analyzer.analyze(statement);

    stateMachine.setUpdateType(analysis.getUpdateType());

    // plan query
    PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator();
    LogicalPlanner logicalPlanner = new LogicalPlanner(stateMachine.getSession(), planOptimizers, idAllocator, metadata, sqlParser, costCalculator);
    Plan plan = logicalPlanner.plan(analysis);
    queryPlan.set(plan);

    // extract inputs
    List inputs = new InputExtractor(metadata, stateMachine.getSession()).extractInputs(plan.getRoot());
    stateMachine.setInputs(inputs);

    // extract output
    Optional
output = new OutputExtractor().extractOutput(plan.getRoot()); stateMachine.setOutput(output); // fragment the plan SubPlan fragmentedPlan = PlanFragmenter.createSubPlans(stateMachine.getSession(), metadata, plan); stateMachine.setPlan(planFlattener.flatten(fragmentedPlan, stateMachine.getSession())); // record analysis time stateMachine.recordAnalysisTime(analysisStart); boolean explainAnalyze = analysis.getStatement() instanceof Explain && ((Explain) analysis.getStatement()).isAnalyze(); return new PlanRoot(fragmentedPlan, !explainAnalyze, extractConnectors(analysis)); }

Analyzer.analyze()方法
执行分析方法栈过多

public Analysis analyze(Statement statement)
{
    return analyze(statement, false);
}
public Analysis analyze(Statement statement, boolean isDescribe)
{
     Statement rewrittenStatement = StatementRewrite.rewrite(session, metadata, sqlParser, queryExplainer, statement, parameters, accessControl);
     Analysis analysis = new Analysis(rewrittenStatement, parameters, isDescribe);
     //构造阶段分析
     StatementAnalyzer analyzer = new StatementAnalyzer(analysis, metadata, sqlParser, accessControl, session);
     //又开始分析
     analyzer.analyze(rewrittenStatement, Optional.empty());
     return analysis;
 }

StatementAnalyzer.analyze()方法
Statement是Node的子类故Node不是一个机器节点,只是一个QueryStatement

public Scope analyze(Node node, Optional outerQueryScope)
{
     return new Visitor(outerQueryScope).process(node, Optional.empty());
 }
public Scope process(Node node, Optional scope)
{
    Scope returnScope = super.process(node, scope);
    checkState(returnScope.getOuterQueryParent().equals(outerQueryScope), "result scope should have outer query scope equal with parameter outer query scope");
    if (scope.isPresent()) {
        checkState(hasScopeAsLocalParent(returnScope, scope.get()), "return scope should have context scope as one of ancestors");
    }
    return returnScope;
}

AsVisitor.process

public R process(Node node, @Nullable C context)
{
    return node.accept(this, context);
}

Query.accept

public  R accept(AstVisitor visitor, C context)
{
    return visitor.visitQuery(this, context);
}

StatementAnalyzer.visitQuery()方法

protected Scope visitQuery(Query node, Optional scope)
{
    //分析Sql的查询范围,如想详细了解可进入到该方法中查看
    Scope withScope = analyzeWith(node, scope);
    //开始分析querybody
    Scope queryBodyScope = process(node.getQueryBody(), withScope);
    if (node.getOrderBy().isPresent()) {
        analyzeOrderBy(node, queryBodyScope);
    }
    else {
        analysis.setOrderByExpressions(node, emptyList());
    }

    // Input fields == Output fields
    analysis.setOutputExpressions(node, descriptorToFields(queryBodyScope));

    Scope queryScope = Scope.builder()
            .withParent(withScope)
            .withRelationType(RelationId.of(node), queryBodyScope.getRelationType())
            .build();
    analysis.setScope(node, queryScope);
    return queryScope;
}
private Scope process(Node node, Scope scope)
{
     return process(node, Optional.of(scope));
 }
public Scope process(Node node, Optional scope)
{
    //调用父类的分析处理中间过程过多,忽略中间阶段直接到具体逻辑
     Scope returnScope = super.process(node, scope);
     checkState(returnScope.getOuterQueryParent().equals(outerQueryScope), "result scope should have outer query scope equal with parameter outer query scope");
     if (scope.isPresent()) {
         checkState(hasScopeAsLocalParent(returnScope, scope.get()), "return scope should have context scope as one of ancestors");
     }
     return returnScope;
 }

访问规范sql是否合规

protected Scope visitQuerySpecification(QuerySpecification node, Optional scope)
{
    if (SystemSessionProperties.isLegacyOrderByEnabled(session)) {
        return legacyVisitQuerySpecification(node, scope);
    }

    // TODO: extract candidate names from SELECT, WHERE, HAVING, GROUP BY and ORDER BY expressions
    // to pass down to analyzeFrom
    //开始分析NodeBody
    Scope sourceScope = analyzeFrom(node, scope);

    node.getWhere().ifPresent(where -> analyzeWhere(node, sourceScope, where));

    List outputExpressions = analyzeSelect(node, sourceScope);
    List> groupByExpressions = analyzeGroupBy(node, sourceScope, outputExpressions);
    analyzeHaving(node, sourceScope);

    Scope outputScope = computeAndAssignOutputScope(node, scope, sourceScope);

    List orderByExpressions = emptyList();
    Optional orderByScope = Optional.empty();
    if (node.getOrderBy().isPresent()) {
        orderByScope = Optional.of(computeAndAssignOrderByScope(node.getOrderBy().get(), sourceScope, outputScope));
        orderByExpressions = analyzeOrderBy(node, orderByScope.get(), outputExpressions);
    }
    else {
        analysis.setOrderByExpressions(node, emptyList());
    }

    List sourceExpressions = new ArrayList<>();
    sourceExpressions.addAll(outputExpressions);
    node.getHaving().ifPresent(sourceExpressions::add);

    analyzeGroupingOperations(node, sourceExpressions, orderByExpressions);
    List aggregations = analyzeAggregations(node, sourceScope, orderByScope, groupByExpressions, sourceExpressions, orderByExpressions);
    analyzeWindowFunctions(node, outputExpressions, orderByExpressions);

    if (!groupByExpressions.isEmpty() && node.getOrderBy().isPresent()) {
        // Create a different scope for ORDER BY expressions when aggregation is present.
        // This is because planner requires scope in order to resolve names against fields.
        // Original ORDER BY scope "sees" FROM query fields. However, during planning
        // and when aggregation is present, ORDER BY expressions should only be resolvable against
        // output scope, group by expressions and aggregation expressions.
        computeAndAssignOrderByScopeWithAggregation(node.getOrderBy().get(), sourceScope, outputScope, aggregations, groupByExpressions, analysis.getGroupingOperations(node));
    }

    return outputScope;
}
private Scope analyzeFrom(QuerySpecification node, Optional scope)
{
    if (node.getFrom().isPresent()) {
        //继续向里面进入忽略中间的进入到Table类
        return process(node.getFrom().get(), scope);
    }

    return createScope(scope);
}

com.facebook.presto.sql.tree.Table.accept()方法 Table继承自QueryBody

public  R accept(AstVisitor visitor, C context)
{
     //访问表
     return visitor.visitTable(this, context);
 }

StatementAnalyzer.visitTable()方法

//由于这个方法过长,只留了关键方法,这里是获取metadata获取到table的句柄
Optional tableHandle = metadata.getTableHandle(session, name);
if (!tableHandle.isPresent()) {
     if (!metadata.getCatalogHandle(session, name.getCatalogName()).isPresent()) {
         throw new SemanticException(MISSING_CATALOG, table, "Catalog %s does not exist", name.getCatalogName());
     }
     if (!metadata.schemaExists(session, new CatalogSchemaName(name.getCatalogName(), name.getSchemaName()))) {
         throw new SemanticException(MISSING_SCHEMA, table, "Schema %s does not exist", name.getSchemaName());
     }
     throw new SemanticException(MISSING_TABLE, table, "Table %s does not exist", name);
 }

MetadataManager.getTableHandle()方法
获取表句柄

public Optional getTableHandle(Session session, QualifiedObjectName table)
{
    requireNonNull(table, "table is null");

    Optional catalog = getOptionalCatalogMetadata(session, table.getCatalogName());
    if (catalog.isPresent()) {
        CatalogMetadata catalogMetadata = catalog.get();
        ConnectorId connectorId = catalogMetadata.getConnectorId(table);
        ConnectorMetadata metadata = catalogMetadata.getMetadataFor(connectorId);
        //创建对应的Connector
        ConnectorTableHandle tableHandle = metadata.getTableHandle(session.toConnectorSession(connectorId), table.asSchemaTableName());
        if (tableHandle != null) {
            return Optional.of(new TableHandle(connectorId, tableHandle));
        }
    }
    return Optional.empty();
}