Presto阅读源码Sql执行过程的记录
Presto源码主要从两部分入手阅读,presto-cli与presto-main分别对应的是client端的入口与server端的入口工程。
版本如下
com.facebook.presto
presto-root
0.190-SNAPSHOT
presto-cli的main方法入口为
com.facebook.presto.cli.Presto.java
presto-main的main方法入口为
com.facebook.presto.server.PrestoServer.java
client端与server端的启动分别从这两个main中启动,今天主要说一下client端将查询请求到server端后如何执行查询的。
StatementResource.createQuery()方法
client会发送一个http请求到 StatementResource 类的createQuery方法
if (isNullOrEmpty(statement)) { throw new WebApplicationException(Response .status(Status.BAD_REQUEST) .type(MediaType.TEXT_PLAIN) .entity("SQL statement is empty") .build()); } SessionContext sessionContext = new HttpRequestSessionContext(servletRequest); ExchangeClient exchangeClient = exchangeClientSupplier.get(deltaMemoryInBytes -> {}); //进入到create方法 Query query = Query.create( sessionContext, statement, queryManager, sessionPropertyManager, exchangeClient, responseExecutor, timeoutExecutor, blockEncodingSerde);
Query.create()方法
准备构造Query对象
Query result = new Query(sessionContext, query, queryManager, sessionPropertyManager, exchangeClient, dataProcessorExecutor, timeoutExecutor, blockEncodingSerde);
执行Query构造器
QueryInfo queryInfo = queryManager.createQuery(sessionContext, query);
SqlQueryManager.createQuery()方法
向查询管理器中提交sql
// start the query in the background queueManager.submit(statement, queryExecution, queryExecutor);
SqlQueryQueueManager.submit()方法
public void submit(Statement statement, QueryExecution queryExecution, Executor executor) { List queues; try { //选择一个符合角色的队列集合 queues = selectQueues(queryExecution.getSession(), executor); } catch (PrestoException e) { queryExecution.fail(e); return; } for (QueryQueue queue : queues) { if (!queue.reserve(queryExecution)) { // 如果我们无法获得许可证进入队列,拒绝查询 // 当这个查询失败时,许可证将被释放 queryExecution.fail(new PrestoException(QUERY_QUEUE_FULL, "Too many queued queries")); return; } } //将构造好的QueuedExecution入队 queues.get(0).enqueue(createQueuedExecution(queryExecution, queues.subList(1, queues.size()), executor)); }
QueryQueue.enqueue.enqueue()方法
public void enqueue(QueuedExecution queuedExecution) { queryQueueSize.incrementAndGet(); //添加一个回调,如果它已经完成的条目出列。 //这使得我们可以在开始前取消输入, //如果在启动后调用,则不起作用。 QueueEntry entry = new QueueEntry(queuedExecution, queryQueueSize::decrementAndGet); queuedExecution.getCompletionFuture().addListener(entry::dequeue, MoreExecutors.directExecutor()); //异步提交QueueEntry asyncSemaphore.submit(entry); }
AsyncSemaphore.submit()方法
public ListenableFuture submit(T task) { QueuedTask queuedTask = new QueuedTask<>(task); //向任务队列里填充QueueEntry queuedTasks.add(queuedTask); //获取许可 acquirePermit(); return queuedTask.getCompletionFuture(); }
获取许可开始任务
private final Runnable runNextTask = this::runNext; private void acquirePermit() { //判断是否有剩余的许可可以发放 if (counter.incrementAndGet() <= maxPermits) { //如果不是所有的许可证都已经发放,那就开始一个任务,注意runNexTask调用的为runNext方法 submitExecutor.execute(runNextTask); } }
private void runNext() { final QueuedTask queuedTask = queuedTasks.poll(); //提交任务 ListenableFuture future = submitTask(queuedTask.getTask()); Futures.addCallback(future, new FutureCallback() { @Override public void onSuccess(Object result) { queuedTask.markCompleted(); releasePermit(); } @Override public void onFailure(Throwable t) { queuedTask.markFailure(t); releasePermit(); } }); }
private final Function> submitter; private ListenableFuture submitTask(T task) { try { //提交并且回调到QueryQueue ListenableFuture future = submitter.apply(task); if (future == null) { return Futures.immediateFailedFuture(new NullPointerException("Submitter returned a null future for task: " + task)); } return future; } catch (Exception e) { return Futures.immediateFailedFuture(e); } }
QueryQueue构造
该对象调用是来自SqlQueryQueueManager.selectQueues().getOrCreateQueues
QueryQueue(Executor queryExecutor, int maxQueuedQueries, int maxConcurrentQueries) { requireNonNull(queryExecutor, "queryExecutor is null"); checkArgument(maxQueuedQueries > 0, "maxQueuedQueries must be greater than zero"); checkArgument(maxConcurrentQueries > 0, "maxConcurrentQueries must be greater than zero"); int permits = maxQueuedQueries + maxConcurrentQueries; // Check for overflow checkArgument(permits > 0, "maxQueuedQueries + maxConcurrentQueries must be less than or equal to %s", Integer.MAX_VALUE); this.queuePermits = new AtomicInteger(permits); this.asyncSemaphore = new AsyncSemaphore<>(maxConcurrentQueries, queryExecutor, queueEntry -> { QueuedExecution queuedExecution = queueEntry.dequeue(); if (queuedExecution != null) { //获取queuedExecution启动他 queuedExecution.start(); return queuedExecution.getCompletionFuture(); } return Futures.immediateFuture(null); }); }
QueuedExecution.start()方法
public void start() { // Only execute if the query is not already completed (e.g. cancelled) if (listenableFuture.isDone()) { return; } if (nextQueues.isEmpty()) { executor.execute(() -> { try (SetThreadName ignored = new SetThreadName("Query-%s", queryExecution.getQueryId())) { //继续启动,异步的执行的是SqlQueryExecution的start方法 queryExecution.start(); } }); } else { nextQueues.get(0).enqueue(new QueuedExecution(queryExecution, nextQueues.subList(1, nextQueues.size()), executor, listenableFuture)); } }
SqlQueryExecution.start()方法
public void start() { try (SetThreadName ignored = new SetThreadName("Query-%s", stateMachine.getQueryId())) { try { // transition to planning if (!stateMachine.transitionToPlanning()) { // query already started or finished return; } // 分析查询 PlanRoot plan = analyzeQuery(); metadata.beginQuery(getSession(), plan.getConnectors()); // plan distribution of query planDistribution(plan); // transition to starting if (!stateMachine.transitionToStarting()) { // query already started or finished return; } // if query is not finished, start the scheduler, otherwise cancel it SqlQueryScheduler scheduler = queryScheduler.get(); if (!stateMachine.isDone()) { scheduler.start(); } } catch (Throwable e) { fail(e); throwIfInstanceOf(e, Error.class); } } }
private PlanRoot analyzeQuery() { try { //开始分析 return doAnalyzeQuery(); } catch (*Error e) { throw new PrestoException(NOT_SUPPORTED, "statement is too large (stack overflow during analysis)", e); } }
private PlanRoot doAnalyzeQuery() { // time analysis phase long analysisStart = System.nanoTime(); // analyze query Analyzer analyzer = new Analyzer(stateMachine.getSession(), metadata, sqlParser, accessControl, Optional.of(queryExplainer), parameters); //分析的关键,statment是sql解析后的对象 Analysis analysis = analyzer.analyze(statement); stateMachine.setUpdateType(analysis.getUpdateType()); // plan query PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator(); LogicalPlanner logicalPlanner = new LogicalPlanner(stateMachine.getSession(), planOptimizers, idAllocator, metadata, sqlParser, costCalculator); Plan plan = logicalPlanner.plan(analysis); queryPlan.set(plan); // extract inputs List inputs = new InputExtractor(metadata, stateMachine.getSession()).extractInputs(plan.getRoot()); stateMachine.setInputs(inputs); // extract output Optional
Analyzer.analyze()方法
执行分析方法栈过多
public Analysis analyze(Statement statement) { return analyze(statement, false); }
public Analysis analyze(Statement statement, boolean isDescribe) { Statement rewrittenStatement = StatementRewrite.rewrite(session, metadata, sqlParser, queryExplainer, statement, parameters, accessControl); Analysis analysis = new Analysis(rewrittenStatement, parameters, isDescribe); //构造阶段分析 StatementAnalyzer analyzer = new StatementAnalyzer(analysis, metadata, sqlParser, accessControl, session); //又开始分析 analyzer.analyze(rewrittenStatement, Optional.empty()); return analysis; }
StatementAnalyzer.analyze()方法
Statement是Node的子类故Node不是一个机器节点,只是一个QueryStatement
public Scope analyze(Node node, Optional outerQueryScope) { return new Visitor(outerQueryScope).process(node, Optional.empty()); }
public Scope process(Node node, Optional scope) { Scope returnScope = super.process(node, scope); checkState(returnScope.getOuterQueryParent().equals(outerQueryScope), "result scope should have outer query scope equal with parameter outer query scope"); if (scope.isPresent()) { checkState(hasScopeAsLocalParent(returnScope, scope.get()), "return scope should have context scope as one of ancestors"); } return returnScope; }
AsVisitor.process
public R process(Node node, @Nullable C context) { return node.accept(this, context); }
Query.accept
public R accept(AstVisitor visitor, C context) { return visitor.visitQuery(this, context); }
StatementAnalyzer.visitQuery()方法
protected Scope visitQuery(Query node, Optional scope) { //分析Sql的查询范围,如想详细了解可进入到该方法中查看 Scope withScope = analyzeWith(node, scope); //开始分析querybody Scope queryBodyScope = process(node.getQueryBody(), withScope); if (node.getOrderBy().isPresent()) { analyzeOrderBy(node, queryBodyScope); } else { analysis.setOrderByExpressions(node, emptyList()); } // Input fields == Output fields analysis.setOutputExpressions(node, descriptorToFields(queryBodyScope)); Scope queryScope = Scope.builder() .withParent(withScope) .withRelationType(RelationId.of(node), queryBodyScope.getRelationType()) .build(); analysis.setScope(node, queryScope); return queryScope; }
private Scope process(Node node, Scope scope) { return process(node, Optional.of(scope)); }
public Scope process(Node node, Optional scope) { //调用父类的分析处理中间过程过多,忽略中间阶段直接到具体逻辑 Scope returnScope = super.process(node, scope); checkState(returnScope.getOuterQueryParent().equals(outerQueryScope), "result scope should have outer query scope equal with parameter outer query scope"); if (scope.isPresent()) { checkState(hasScopeAsLocalParent(returnScope, scope.get()), "return scope should have context scope as one of ancestors"); } return returnScope; }
访问规范sql是否合规
protected Scope visitQuerySpecification(QuerySpecification node, Optional scope) { if (SystemSessionProperties.isLegacyOrderByEnabled(session)) { return legacyVisitQuerySpecification(node, scope); } // TODO: extract candidate names from SELECT, WHERE, HAVING, GROUP BY and ORDER BY expressions // to pass down to analyzeFrom //开始分析NodeBody Scope sourceScope = analyzeFrom(node, scope); node.getWhere().ifPresent(where -> analyzeWhere(node, sourceScope, where)); List outputExpressions = analyzeSelect(node, sourceScope); List> groupByExpressions = analyzeGroupBy(node, sourceScope, outputExpressions); analyzeHaving(node, sourceScope); Scope outputScope = computeAndAssignOutputScope(node, scope, sourceScope); List orderByExpressions = emptyList(); Optional orderByScope = Optional.empty(); if (node.getOrderBy().isPresent()) { orderByScope = Optional.of(computeAndAssignOrderByScope(node.getOrderBy().get(), sourceScope, outputScope)); orderByExpressions = analyzeOrderBy(node, orderByScope.get(), outputExpressions); } else { analysis.setOrderByExpressions(node, emptyList()); } List sourceExpressions = new ArrayList<>(); sourceExpressions.addAll(outputExpressions); node.getHaving().ifPresent(sourceExpressions::add); analyzeGroupingOperations(node, sourceExpressions, orderByExpressions); List aggregations = analyzeAggregations(node, sourceScope, orderByScope, groupByExpressions, sourceExpressions, orderByExpressions); analyzeWindowFunctions(node, outputExpressions, orderByExpressions); if (!groupByExpressions.isEmpty() && node.getOrderBy().isPresent()) { // Create a different scope for ORDER BY expressions when aggregation is present. // This is because planner requires scope in order to resolve names against fields. // Original ORDER BY scope "sees" FROM query fields. However, during planning // and when aggregation is present, ORDER BY expressions should only be resolvable against // output scope, group by expressions and aggregation expressions. computeAndAssignOrderByScopeWithAggregation(node.getOrderBy().get(), sourceScope, outputScope, aggregations, groupByExpressions, analysis.getGroupingOperations(node)); } return outputScope; }
private Scope analyzeFrom(QuerySpecification node, Optional scope) { if (node.getFrom().isPresent()) { //继续向里面进入忽略中间的进入到Table类 return process(node.getFrom().get(), scope); } return createScope(scope); }
com.facebook.presto.sql.tree.Table.accept()方法 Table继承自QueryBody
public R accept(AstVisitor visitor, C context) { //访问表 return visitor.visitTable(this, context); }
StatementAnalyzer.visitTable()方法
//由于这个方法过长,只留了关键方法,这里是获取metadata获取到table的句柄 Optional tableHandle = metadata.getTableHandle(session, name); if (!tableHandle.isPresent()) { if (!metadata.getCatalogHandle(session, name.getCatalogName()).isPresent()) { throw new SemanticException(MISSING_CATALOG, table, "Catalog %s does not exist", name.getCatalogName()); } if (!metadata.schemaExists(session, new CatalogSchemaName(name.getCatalogName(), name.getSchemaName()))) { throw new SemanticException(MISSING_SCHEMA, table, "Schema %s does not exist", name.getSchemaName()); } throw new SemanticException(MISSING_TABLE, table, "Table %s does not exist", name); }
MetadataManager.getTableHandle()方法
获取表句柄
public Optional getTableHandle(Session session, QualifiedObjectName table) { requireNonNull(table, "table is null"); Optional catalog = getOptionalCatalogMetadata(session, table.getCatalogName()); if (catalog.isPresent()) { CatalogMetadata catalogMetadata = catalog.get(); ConnectorId connectorId = catalogMetadata.getConnectorId(table); ConnectorMetadata metadata = catalogMetadata.getMetadataFor(connectorId); //创建对应的Connector ConnectorTableHandle tableHandle = metadata.getTableHandle(session.toConnectorSession(connectorId), table.asSchemaTableName()); if (tableHandle != null) { return Optional.of(new TableHandle(connectorId, tableHandle)); } } return Optional.empty(); }