Skip to content

Instantly share code, notes, and snippets.

@ashigeru
Last active Dec 30, 2020
Embed
What would you like to do?
Prestoソースコードリーディング#2 メモ

Presto メモ

  • presto 0.60
  • [ QueryResource , TaskResource )

環境

なんかそのまま mvn clean install eclipse:eclipse -DskipTests コマンド叩いても checkstyle で落ちたので:

mvn clean install -DskipTests
mvn eclipse:eclipse -Dcheckstyle.skip

ちなみに、EclipseじゃなくてIDEA推奨の模様。

エントリポイント

サービスエントリ。

リクエストを受け取る QueryResource から。

// presto-server/src/main/java/com/facebook/presto/server/QueryResource.java:L89
@POST
@Produces(MediaType.APPLICATION_JSON)
public Response createQuery(String query,
        @HeaderParam(PRESTO_USER) String user,
        @HeaderParam(PRESTO_SOURCE) String source,
        @HeaderParam(PRESTO_CATALOG) @DefaultValue(DEFAULT_CATALOG) String catalog,
        @HeaderParam(PRESTO_SCHEMA) @DefaultValue(DEFAULT_SCHEMA) String schema,
        @HeaderParam(USER_AGENT) String userAgent,
        @Context HttpServletRequest requestContext,
        @Context UriInfo uriInfo)

クエリ発行リクエストの処理。

// presto-server/src/main/java/com/facebook/presto/server/QueryResource.java:L106
QueryInfo queryInfo = queryManager.createQuery(new Session(user, source, catalog, schema, remoteUserAddress, userAgent), query);

QueryManager.createQuery() を起動しているが、 QueryManager はインターフェース。


QueryManager のバインディング。 SqlQueryManager.createQuery() へ。

// presto-server/src/main/java/com/facebook/presto/server/CoordinatorModule.java:L104
binder.bind(QueryManager.class).to(SqlQueryManager.class).in(Scopes.SINGLETON);

SqlQueryManager.createQuery() の先頭。

// presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java:L191
public QueryInfo createQuery(Session session, String query)
{
    checkNotNull(query, "query is null");
    Preconditions.checkArgument(!query.isEmpty(), "query must not be empty string");
    QueryId queryId = queryIdGenerator.createNextQueryId();
    Statement statement;
    try {
        statement = SqlParser.createStatement(query);

SqlParser.createStatement() でクエリ文字列をパース。

構文解析周り

SqlParser.createStatement() の本体。

// presto-parser/src/main/java/com/facebook/presto/sql/parser/SqlParser.java:L32
public static Statement createStatement(String sql)
{
    try {
        return createStatement(parseStatement(sql));
    }
    catch (StackOverflowError e) {
        throw new ParsingException("statement is too large (stack overflow while parsing)");
    }
}

CommonTree はANTLRのジェネリックなASTモデル。

// presto-parser/src/main/java/com/facebook/presto/sql/parser/SqlParser.java:L78
static CommonTree parseStatement(String sql)
{
    try {
        return (CommonTree) getParser(sql).singleStatement().getTree();
    }
    catch (RecognitionException e) {
        throw new AssertionError(e); // RecognitionException is not thrown
    }
}

ANTLRの機構を使って2-passでASTモデル構築。

// presto-parser/src/main/java/com/facebook/presto/sql/parser/SqlParser.java:L53
static Statement createStatement(CommonTree tree)
{
    TreeNodeStream stream = new BufferedTreeNodeStream(tree);
    StatementBuilder builder = new StatementBuilder(stream);
    try {
        return builder.statement().value;
    }
    catch (RecognitionException e) {
        throw new AssertionError(e); // RecognitionException is not thrown
    }
}

statement 構文規則。

// presto-parser/src/main/antlr3/com/facebook/presto/sql/parser/StatementBuilder.g:L58
statement returns [Statement value]
    : query                     { $value = $query.value; }
    | explain                   { $value = $explain.value; }
    | showTables                { $value = $showTables.value; }
    | showSchemas               { $value = $showSchemas.value; }
    | showCatalogs              { $value = $showCatalogs.value; }
    | showColumns               { $value = $showColumns.value; }
    | showPartitions            { $value = $showPartitions.value; }
    | showFunctions             { $value = $showFunctions.value; }
    | useCollection             { $value = $useCollection.value; }
    | createTable               { $value = $createTable.value; }
    | createMaterializedView    { $value = $createMaterializedView.value; }
    | refreshMaterializedView   { $value = $refreshMaterializedView.value; }
    | createAlias               { $value = $createAlias.value; }
    | dropAlias                 { $value = $dropAlias.value; }
    | dropTable                 { $value = $dropTable.value; }
    ;

今回は query を見る。


query 構文の .valueQuery 型。

// presto-parser/src/main/antlr3/com/facebook/presto/sql/parser/StatementBuilder.g:L76
query returns [Query value]
    : ^(QUERY queryExpr) { $value = $queryExpr.value; }
    ;

Query クラスのフィールド。

// presto-parser/src/main/java/com/facebook/presto/sql/tree/Query.java:L23
public class Query
        extends Statement
{
    private final Optional<With> with;
    private final QueryBody queryBody;
    private final List<SortItem> orderBy;
    private final Optional<String> limit;
    private final Optional<Approximate> approximate;

QueryBody 型の queryBody がリレーションの計算部分。

QueryBody は以下のサブクラスがある。

  • QuerySpecification
  • Except
  • Intersect
  • Union
  • Table
  • TableSubquery

SELECT ... FROM ... WHERE ...QuerySpecification で表現。

FROM 句にテーブルを直接指定すると Table で表現。

クエリの実行オブジェクト

QueryExecution の生成。 executionFactories : Map<Class<? extends Statement>, QueryExecutionFactory> からファクトリを探す。

statement.getClass()Query クラス。

// presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java:L206
QueryExecutionFactory<?> queryExecutionFactory = executionFactories.get(statement.getClass());
Preconditions.checkState(queryExecutionFactory != null, "Unsupported statement type %s", statement.getClass().getName());
final QueryExecution queryExecution = queryExecutionFactory.createQueryExecution(queryId, query, session, statement);

バインディングの解決。

Query の場合は SqlQueryExecution 系がバインディングされている。

// presto-server/src/main/java/com/facebook/presto/server/CoordinatorModule.java:L152
MapBinder<Class<? extends Statement>, QueryExecution.QueryExecutionFactory<?>> executionBinder = newMapBinder(binder,
        new TypeLiteral<Class<? extends Statement>>() {},
        new TypeLiteral<QueryExecution.QueryExecutionFactory<?>>() {}
);
...
executionBinder.addBinding(Query.class).to(SqlQueryExecution.SqlQueryExecutionFactory.class).in(Scopes.SINGLETON);

いちおう SqlQueryExecutionFactory をみておくと、 SqlQueryExecution を作るだけ。

// presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java:L453
@Override
public SqlQueryExecution createQueryExecution(QueryId queryId, String query, Session session, Statement statement)
{
    SqlQueryExecution queryExecution = new SqlQueryExecution(queryId, ...

queryExecutor 経由でクエリを実行。

// presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java:L227
// start the query in the background
queryExecutor.submit(new QueryStarter(queryExecution, stats));

queryExecutor は普通のスレッドプール。

queryExecutor.submit(Runnable)run() メソッドを起動するだけ。

// presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java:L101
this.queryExecutor = Executors.newCachedThreadPool(threadsNamed("query-scheduler-%d"));

QueryStarter implements Runnable の本体。

// presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java:L378
public QueryStarter(QueryExecution queryExecution, SqlQueryManagerStats stats)
{
    this.queryExecution = queryExecution;
    this.stats = stats;
}
@Override
public void run()
{
    try (SetThreadName setThreadName = new SetThreadName("Query-%s", queryExecution.getQueryInfo().getQueryId())) {
        stats.queryStarted();
        queryExecution.start();
    }
}

QueryExecution.start() を呼んでいるので、 SqlQueryExecution.start() へ。

論理実行計画の生成

SqlQueryExecution.start() の前半部分。

analyzeQuery() メソッドで SubPlan を作る。

// presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java:L151
    // transition to planning
    if (!stateMachine.beginPlanning()) {
        // query already started or finished
        return;
    }
    // analyze query
    SubPlan subplan = analyzeQuery();

SubPlan はサブプラン本体の PlanFragment と、再帰的なサブプランをもっている感じ。

// presto-main/src/main/java/com/facebook/presto/sql/planner/SubPlan.java:L31
public class SubPlan
{
    private final PlanFragment fragment;
    private final List<SubPlan> children;

PlanFragment でまず重要そうなのは、 PlanNode のあたり。

// presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragment.java:L42
public class PlanFragment
{
    ...
    private final PlanNode root;
    ...
    private final List<PlanNode> sources;

PlanNode は論理オペレータを表していて、たとえば以下のサブクラスがある。

  • TableScanNode
  • ProjectNode
  • SelectNode
  • JoinNode
  • ...

それぞれも、オペレータの入力としてさらに PlanNode を持っている感じ。

なお、以下のサブクラスは SubPlan 以降にしか出現しない。

  • SinkNode - PlanFragment の出力
  • ExchangeNode - SinkNode による出力を入力にとる

analyzeQuery() 本体。

重要そうなのは:

  • Analyzer.analyze()
  • LogicalPlanner.plan()
  • DistributedLogicalPlanner.createSubPlans()
// presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java:L210
Analyzer analyzer = new Analyzer(stateMachine.getSession(), metadata, Optional.of(queryExplainer), approximateQueriesEnabled);
Analysis analysis = analyzer.analyze(statement);
PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator();
// plan query
LogicalPlanner logicalPlanner = new LogicalPlanner(stateMachine.getSession(), planOptimizers, idAllocator, metadata, periodicImportManager, storageManager);
Plan plan = logicalPlanner.plan(analysis);
List<Input> inputs = new InputExtractor(metadata).extract(plan.getRoot());
stateMachine.setInputs(inputs);
// fragment the plan
SubPlan subplan = new DistributedLogicalPlanner(metadata, idAllocator).createSubPlans(plan, false);
stateMachine.recordAnalysisTime(analysisStart);
return subplan;

なお、 LogicalPlanner.plan() が生成する Plan は、ステージ区切り前の論理実行計画。 ルートとして PlanNode のツリーを持っている。

// presto-main/src/main/java/com/facebook/presto/sql/planner/Plan.java:L22
public class Plan
{
    private final PlanNode root;

意味解析 (Analysis)

Analyzer.analyze() 本体。

名前表を作ったり、型の計算をしたりして、 Analysis オブジェクトに積んでいく感じ。

戻り値の TupleDescriptor は主に結果リレーションの型情報。

// presto-main/src/main/java/com/facebook/presto/sql/analyzer/Analyzer.java:L50
Analysis analysis = new Analysis();
StatementAnalyzer analyzer = new StatementAnalyzer(analysis, metadata, session, approximateQueriesEnabled, queryExplainer);
TupleDescriptor outputDescriptor = analyzer.process(statement, new AnalysisContext());
analysis.setOutputDescriptor(outputDescriptor);
return analysis;

StatementAnalyzer extends AstVisitor なので、 visitQuery() へ。


StatementAnalyzer.visitQuery() の一部を抜粋。

Query.getQueryBody() : QueryBody は、 QuerySpecification として進める。

// presto-main/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java:L504
TupleAnalyzer analyzer = new TupleAnalyzer(analysis, session, metadata, approximateQueriesEnabled);
TupleDescriptor descriptor = analyzer.process(node.getQueryBody(), context);

TupleAnalyzer extends AstVisitor なので、 analyzer.process()TupleAnalyzer.visitQuerySpecification() へ。


TupleAnalyzer.visitQuerySpecification() の本体。

ざっくり言えば、 FROM 句で入力した型から、 SELECT 句で出力する型を計算している感じ。

// presto-main/src/main/java/com/facebook/presto/sql/analyzer/TupleAnalyzer.java:L258
TupleDescriptor tupleDescriptor = analyzeFrom(node, context);
analyzeWhere(node, tupleDescriptor, context);
List<FieldOrExpression> outputExpressions = analyzeSelect(node, tupleDescriptor, context);
List<FieldOrExpression> groupByExpressions = analyzeGroupBy(node, tupleDescriptor, context, outputExpressions);
List<FieldOrExpression> orderByExpressions = analyzeOrderBy(node, tupleDescriptor, context, outputExpressions);
analyzeHaving(node, tupleDescriptor, context);
analyzeAggregations(node, tupleDescriptor, groupByExpressions, outputExpressions, orderByExpressions);
analyzeWindowFunctions(node, outputExpressions, orderByExpressions);
TupleDescriptor descriptor = computeOutputDescriptor(node, tupleDescriptor);
analysis.setOutputDescriptor(node, descriptor);

論理オペレータへの変換 (Plan)

LogicalPlanner.plan() 本体。

ざっくり:

  • RelationPlanner.process() でクエリ全体の論理オペレータツリーを作成
  • createOutputPlan() で論理オペレータツリーの最後に出力用のオペレータを接続
  • PlanOptimizer.optimize() で最適化
  • Plan オブジェクトで包んで返す
// presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java:L92
public Plan plan(Analysis analysis)
{
    RelationPlan plan;
    ...
    else {
        RelationPlanner planner = new RelationPlanner(analysis, symbolAllocator, idAllocator, metadata, session);
        plan = planner.process(analysis.getQuery(), null);
    }
    PlanNode root = createOutputPlan(plan, analysis);
    ...
    for (PlanOptimizer optimizer : planOptimizers) {
        root = optimizer.optimize(root, session, symbolAllocator.getTypes(), symbolAllocator, idAllocator);
    }
    ...
    return new Plan(root, symbolAllocator);
}

RelationPlanner extends AstVisitor なので、 RelationPlanner.visitQuery() へ。


RelationPlanner.visitQuery() 本体。

さらに QueryPlanner.visitQuery() へ。

// presto-main/src/main/java/com/facebook/presto/sql/planner/RelationPlanner.java:L215
protected RelationPlan visitQuery(Query node, Void context)
{
    PlanBuilder subPlan = new QueryPlanner(analysis, symbolAllocator, idAllocator, metadata, session).process(node, null);
    ImmutableList.Builder<Symbol> outputSymbols = ImmutableList.builder();
    for (FieldOrExpression fieldOrExpression : analysis.getOutputExpressions(node)) {
        outputSymbols.add(subPlan.translate(fieldOrExpression));
    }
    return new RelationPlan(subPlan.getRoot(), analysis.getOutputDescriptor(node), outputSymbols.build());
}

QueryPlanner.visitQuery() 本体。

おそらく、 QuerySpecification (<: QueryBody) ) を論理オペレーターに変換した後、 IN, ORDER BY, LIMIT, SELECT 句あたりを適当に処理するのではないかと。

// presto-main/src/main/java/com/facebook/presto/sql/planner/QueryPlanner.java:L103
protected PlanBuilder visitQuery(Query query, Void context)
{
    PlanBuilder builder = planQueryBody(query);
    Set<InPredicate> inPredicates = analysis.getInPredicates(query);
    builder = appendSemiJoins(builder, inPredicates);
    List<FieldOrExpression> orderBy = analysis.getOrderByExpressions(query);
    List<FieldOrExpression> outputs = analysis.getOutputExpressions(query);
    builder = project(builder, Iterables.concat(orderBy, outputs));
    builder = sort(builder, query);
    builder = project(builder, analysis.getOutputExpressions(query));
    builder = limit(builder, query);
    return builder;
}

省略して RelationPlanner.visitQuerySpecification() へ。


QueryPlanner.visitQuerySpecification() 本体。

FROM, IN, WHERE などを順番に適用していっている感じ。

// presto-main/src/main/java/com/facebook/presto/sql/planner/QueryPlanner.java:L121
protected PlanBuilder visitQuerySpecification(QuerySpecification node, Void context)
{
    PlanBuilder builder = planFrom(node);
    Set<InPredicate> inPredicates = analysis.getInPredicates(node);
    builder = appendSemiJoins(builder, inPredicates);
    builder = filter(builder, analysis.getWhere(node));
    builder = aggregate(builder, node);
    builder = filter(builder, analysis.getHaving(node));
    builder = window(builder, node);
    List<FieldOrExpression> orderBy = analysis.getOrderByExpressions(node);
    List<FieldOrExpression> outputs = analysis.getOutputExpressions(node);
    builder = project(builder, Iterables.concat(orderBy, outputs));
    builder = distinct(builder, node, outputs, orderBy);
    builder = sort(builder, node);
    builder = project(builder, analysis.getOutputExpressions(node));
    builder = limit(builder, node);
    return builder;
}

QueryPlanner.planFrom() だけ読んでみる。


QueryPlanner.planFrom() 本体。

// presto-main/src/main/java/com/facebook/presto/sql/planner/QueryPlanner.java:L160
private PlanBuilder planFrom(QuerySpecification node)
{
    RelationPlan relationPlan;
    ...
    else {
        relationPlan = new RelationPlanner(analysis, symbolAllocator, idAllocator, metadata, session)
                .process(Iterables.getOnlyElement(node.getFrom()), null);
    }
    ...
    return new PlanBuilder(translations, relationPlan.getRoot());
}

FROM 句に指定されたリレーションを再帰的に RelationPlanner で変換している。

テーブルの場合を見るということで、 RelationPlanner.visitTable() へ。


RelationPlanner.visitTable() 本体。

かなり省略すると、テーブルに対応するハンドルを探して、 TableScanNode というオペレータを作る。 さらに、サンプリングの設定がなされていたら、テーブルスキャン結果を MaterializeSampleNode で包む。

// presto-main/src/main/java/com/facebook/presto/sql/planner/RelationPlanner.java:L92
protected RelationPlan visitTable(Table node, Void context)
{
    ...
    TableHandle handle = analysis.getTableHandle(node);
    ...
    PlanNode root = new TableScanNode(idAllocator.getNextId(), handle, nodeOutputSymbols, columns.build(), null, Optional.<GeneratedPartitions>absent());
    if (sampleWeightSymbol != null) {
        root = new MaterializeSampleNode(idAllocator.getNextId(), root, sampleWeightSymbol);
    }
    return new RelationPlan(root, descriptor, planOutputSymbols);
}

LogicalPlanner.plan() まで戻って、 PlanOptimizer について少しだけ。

// presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java:L92
public Plan plan(Analysis analysis)
{
    RelationPlan plan;
    ...
    for (PlanOptimizer optimizer : planOptimizers) {
        root = optimizer.optimize(root, session, symbolAllocator.getTypes(), symbolAllocator, idAllocator);
    }
    ...
    return new Plan(root, symbolAllocator);
}

PlanOptimizersFactory のあたりで PlanOptimizer を正しい順序で登録している。 適用順序が超重要なので、コメントが長め。

// presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizersFactory.java:L57
builder.add(new ImplementSampleAsFilter(),
        new SimplifyExpressions(metadata),
        new UnaliasSymbolReferences(),
        new PruneRedundantProjections(),
        new SetFlatteningOptimizer(),
        new MaterializeSamplePullUp(),
        new LimitPushDown(), // Run the LimitPushDown after flattening set operators to make it easier to do the set flattening
        new PredicatePushDown(metadata, splitManager, analyzerConfig.isApproximateQueriesEnabled()),
        new PredicatePushDown(metadata, splitManager, analyzerConfig.isApproximateQueriesEnabled()), // Run predicate push down one more time in case we can leverage new information from generated partitions
        new MergeProjections(),
        new SimplifyExpressions(metadata), // Re-run the SimplifyExpressions to simplify any recomposed expressions from other optimizations
        new UnaliasSymbolReferences(), // Run again because predicate pushdown might add more projections
        new PruneUnreferencedOutputs(), // Make sure to run this at the end to help clean the plan for logging/execution and not remove info that other optimizers might need at an earlier point
        new PruneRedundantProjections()); // This MUST run after PruneUnreferencedOutputs as it may introduce new redundant projections

論理実行計画はこの辺りで。

論理オペレータのステージ分割 (SubPlan)

DistributedLogicalPlanner.createSubPlans() 本体。

DistributedLogicalPlanner.Visitor (extends PlanVisitor) に処理を移譲し、 SubPlanBuilder.build()SubPlan を作る。

// presto-main/src/main/java/com/facebook/presto/sql/planner/DistributedLogicalPlanner.java:L82
public SubPlan createSubPlans(Plan plan, boolean createSingleNodePlan)
{
    Visitor visitor = new Visitor(plan.getSymbolAllocator(), createSingleNodePlan);
    SubPlanBuilder builder = plan.getRoot().accept(visitor, null);
    SubPlan subplan = builder.build();
    subplan.sanityCheck();
    return subplan;
}

ざっくり言うと:

  1. PlanNode の木をいくつかの部分木に分解する
  2. 部分木に分解した際の切断面のうち、根に SinkNode を配置する → 別のサブプランにデータを渡す
  3. 部分木に分解した際の切断面のうち、末端に ExchangeNode を配置する → 別のサブプランからデータを受け取る
  4. それぞれの部分木を SubPlan とする

という感じ。 それぞれの PlanNode に対し、対応する DistributedLogicalPlanner.Visitor.visit*() で上記を行っていく。

ただし、引数の createSingleNodePlantrue の場合、 SinkNodeExchangeNode は生成されない模様。


分散処理にあたって PlanFragment をもういちど。

// presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragment.java:L42
public class PlanFragment
{
    public enum PlanDistribution
    {
        NONE,
        FIXED,
        SOURCE,
        COORDINATOR_ONLY
    }

    public static enum OutputPartitioning
    {
        NONE,
        HASH
    }

    ...
    private final PlanDistribution distribution;
    ...
    private final OutputPartitioning outputPartitioning;
    private final List<Symbol> partitionBy;

今のところの理解では、 PlanDistribution は処理を分散する際の制約。 OutputPartitioningSinkNode が出力する内容のパーティショニング方法。

PlanDistribution :

  • NONE - 1台のノードで実行する ( OutputNode, TopNNode, LimitNode, グループ化なしの AggregationNode など)
  • FIXED - N台のノードで実行する ( MarkDistinctNode, WindowNode, グループ化ありの AggregationNode など )
  • SOURCE - データのあるノードで実行する ( TableScanNode のみ )
  • COORDINATOR_ONLY - コーディネータノードのみで実行する ( TableCommitNode のみ )

※上記以外の PlanDistribution はないため、データは小さめでCPUだけぶん回すような計算はあまり想定していないか

OutputPartitioning :

  • NONE - 適当に出力
  • HASH - 特定の属性でパーティション化して出力

TableScanNode の例。 ノード自身からなるサブプランを新しく作成している。

// presto-main/src/main/java/com/facebook/presto/sql/planner/DistributedLogicalPlanner.java:L353
public SubPlanBuilder visitTableScan(TableScanNode node, Void context)
{
    return createSourceDistributionPlan(node, node.getId());
}

FilterNode の例。 サブプランの区切りは特に出現しない。

// presto-main/src/main/java/com/facebook/presto/sql/planner/DistributedLogicalPlanner.java:L228
public SubPlanBuilder visitFilter(FilterNode node, Void context)
{
    SubPlanBuilder current = node.getSource().accept(this, context);
    current.setRoot(new FilterNode(node.getId(), current.getRoot(), node.getPredicate()));
    return current;
}

JoinNode の例。 SinkNodeExchangeNode を作って、 SubPlanBuilder.build() を実行している。

// presto-main/src/main/java/com/facebook/presto/sql/planner/DistributedLogicalPlanner.java:L436
public SubPlanBuilder visitJoin(JoinNode node, Void context)
{
    SubPlanBuilder left = node.getLeft().accept(this, context);
    SubPlanBuilder right = node.getRight().accept(this, context);
    if (left.isDistributed() || right.isDistributed()) {
        switch (node.getType()) {
            case INNER:
            case LEFT:
                right.setRoot(new SinkNode(idAllocator.getNextId(), right.getRoot(), right.getRoot().getOutputSymbols()));
                left.setRoot(new JoinNode(node.getId(),
                        node.getType(),
                        left.getRoot(),
                        new ExchangeNode(idAllocator.getNextId(), right.getId(), right.getRoot().getOutputSymbols()),
                        node.getCriteria()));
                left.addChild(right.build());
                return left;

これを見る限り、内部結合でも左と右が重要?

あと、 CROSScase に含まれていない模様。 PredicatePushDown.Rewriter.rewriteJoin() あたりで、JOIN ... ON 0 = 0 にしている。


OutputNode の例。

出力対象の node.getSource() までを SinkNode につなげてサブプランを区切りつつ、 ExchangeNode で受け取って OutputNode で出力するところは、 DistributionPlan.NONE で単一ノードにかき集めている。

// presto-main/src/main/java/com/facebook/presto/sql/planner/DistributedLogicalPlanner.java:L298
public SubPlanBuilder visitOutput(OutputNode node, Void context)
{
    SubPlanBuilder current = node.getSource().accept(this, context);
    if (current.isDistributed()) {
        current.setRoot(new SinkNode(idAllocator.getNextId(), current.getRoot(), current.getRoot().getOutputSymbols()));
        // create a new non-partitioned fragment
        current = createSingleNodePlan(new ExchangeNode(idAllocator.getNextId(), current.getId(), current.getRoot().getOutputSymbols()))
                .addChild(current.build());
    }
    current.setRoot(new OutputNode(node.getId(), current.getRoot(), node.getColumnNames(), node.getOutputSymbols()));
    return current;
}

ステージ別実行計画の生成 (StageExecutionPlan)

SqlQueryExecution.planDistribution() に戻ってくる。

// presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java:L228
private void planDistribution(SubPlan subplan)
{
    // time distribution planning
    long distributedPlanningStart = System.nanoTime();
    // plan the execution on the active nodes
    DistributedExecutionPlanner distributedPlanner = new DistributedExecutionPlanner(splitManager, shardManager);
    StageExecutionPlan outputStageExecutionPlan = distributedPlanner.plan(subplan);

DistributedExecutionPlanner.plan() へ。


DistributedExecutionPlanner.plan() の前半。

DistributedExecutionPlanner.Visitor extends PlanVisitor で、 自身の PlanFragment 内にテーブルがあれば、それを SplitSource として記録している。

// presto-main/src/main/java/com/facebook/presto/sql/planner/DistributedExecutionPlanner.java:L79
public StageExecutionPlan plan(SubPlan root, Predicate<Partition> materializedViewPartitionPredicate)
{
    PlanFragment currentFragment = root.getFragment();
    // get splits for this fragment, this is lazy so split assignments aren't actually calculated here
    Visitor visitor = new Visitor();
    NodeSplits nodeSplits = currentFragment.getRoot().accept(visitor, materializedViewPartitionPredicate);

見どころだけ書くと:

  • PlanFragment の末端が ExchangeNode の場合、 SplitSource なしの扱い
  • JoinNode の入力が二つとも SplitSource を持つことはできない
  • TableScanNodeSampleNode が同時にあって、かつサンプルにベルヌーイ分布を使っていない場合、スプリットごと間引いている

DistributedExecutionPlanner.plan() の後半。

SubPlan の子要素についても、再帰的に DistributedExecutionPlanner.plan() を実行している。

自身の PlanFragment と、子要素から StageExecutionPlan を構築。

// presto-main/src/main/java/com/facebook/presto/sql/planner/DistributedExecutionPlanner.java:L79
public StageExecutionPlan plan(SubPlan root, Predicate<Partition> materializedViewPartitionPredicate)
{
    ...
    for (SubPlan childPlan : root.getChildren()) {
        dependencies.add(plan(childPlan, materializedViewPartitionPredicate));
    }
    return new StageExecutionPlan(currentFragment,
            nodeSplits.getDataSource(),
            dependencies.build(),
            visitor.getOutputReceivers());
}

SqlQueryExecution.planDistribution() の続きでは、根の StageExecutionPlan から SqlStageExecution を作っている。 根の StageExecutionPlan は子の StageExecutionPlan も含んでいるため、 SqlStageExecution はクエリ全体のプラン実行を表している。

// presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java:L233
// plan the execution on the active nodes
DistributedExecutionPlanner distributedPlanner = new DistributedExecutionPlanner(splitManager, shardManager);
StageExecutionPlan outputStageExecutionPlan = distributedPlanner.plan(subplan);
...
// build the stage execution objects (this doesn't schedule execution)
SqlStageExecution outputStage = new SqlStageExecution(stateMachine.getQueryId(),
        locationFactory,
        outputStageExecutionPlan,
        nodeScheduler,
        remoteTaskFactory,
        stateMachine.getSession(),
        scheduleSplitBatchSize,
        maxPendingSplitsPerNode,
        initialHashPartitions,
        queryExecutor,
        ROOT_OUTPUT_BUFFERS);
this.outputStage.set(outputStage);

この結果を SqlQueryExecution.outputStage に保持させている。 これは後でキャンセルしたり、情報を取り出したりする際にも使う模様。

論理実行計画の発行

SqlQueryExecution.start() の最後の部分で、 SqlStageExecution.start() を実行している。

// presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java:L147
public void start()
{
            ...
            // plan distribution of query
            planDistribution(subplan);
            ...
            SqlStageExecution stage = outputStage.get();
            if (!stateMachine.isDone()) {
                stage.start();
            }

SqlStageExecution のコンストラクタ入口。 ルートステージは parentnull に。

// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L174
private SqlStageExecution(@Nullable StageExecutionNode parent,
        QueryId queryId,
        AtomicInteger nextStageId,
        LocationFactory locationFactory,
        StageExecutionPlan plan,
        NodeScheduler nodeScheduler,
        RemoteTaskFactory remoteTaskFactory,
        Session session,
        int splitBatchSize,
        int maxPendingSplitsPerNode,
        int initialHashPartitions,
        ExecutorService executor)

SqlStageExecution のコンストラクタ内で、再帰的にサブステージに関するも作っている。

// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L213
ImmutableMap.Builder<PlanFragmentId, StageExecutionNode> subStages = ImmutableMap.builder();
for (StageExecutionPlan subStagePlan : plan.getSubStages()) {
    PlanFragmentId subStageFragmentId = subStagePlan.getFragment().getId();
    StageExecutionNode subStage = new SqlStageExecution(this,
            queryId,
            nextStageId,
            locationFactory,
            subStagePlan,
            nodeScheduler,
            remoteTaskFactory,
            session,
            splitBatchSize,
            maxPendingSplitsPerNode,
            initialHashPartitions,
            executor);
    subStage.addStateChangeListener(new StateChangeListener<StageInfo>()
    {
        @Override
        public void stateChanged(StageInfo stageInfo)
        {
            doUpdateState();
        }
    });
    subStages.put(subStageFragmentId, subStage);
}
this.subStages = subStages.build();

SqlStageExecution.start() のエントリ。

// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L508
public Future<?> start()
{
    try (SetThreadName setThreadName = new SetThreadName("Stage-%s", stageId)) {
        return scheduleStartTasks();
    }
}

SqlStageExecution.scheduleStartTasks() へ。


SqlStageExecution.scheduleStartTasks() の本体。

サブステージの SqlStageExecution.scheduleStartTasks() を再帰的に実行したあと、 別スレッドで自身の SqlStageExecution.startTasks() を実行。

// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L517
public Future<?> scheduleStartTasks()
{
    try (SetThreadName setThreadName = new SetThreadName("Stage-%s", stageId)) {
        // start sub-stages (starts bottom-up)
        for (StageExecutionNode subStage : subStages.values()) {
            subStage.scheduleStartTasks();
        }
        return executor.submit(new Runnable()
        {
            @Override
            public void run()
            {
                startTasks();
            }
        });
    }
}

SqlStageExecution.startTasks() の本体。

SubPlan の作成時に解析した PlanDistribution の種類に従って、タスクに関する schedule*() を起動している。

// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L535
private void startTasks()
{
            ...
            // schedule tasks
            if (fragment.getDistribution() == PlanDistribution.NONE) {
                scheduleFixedNodeCount(1);
            }
            else if (fragment.getDistribution() == PlanDistribution.FIXED) {
                scheduleFixedNodeCount(initialHashPartitions);
            }
            else if (fragment.getDistribution() == PlanDistribution.SOURCE) {
                scheduleSourcePartitionedNodes();
            }
            else if (fragment.getDistribution() == PlanDistribution.COORDINATOR_ONLY) {
                scheduleOnCurrentNode();
            }

先に斜め読みしておくと、以下のように scheduleTask() に辿り着く模様。

  • scheduleFixedNodeCount()scheduleTask()
  • scheduleSourcePartitionedNodes()assignSplits()scheduleTask()
  • scheduleOnCurrentNode()scheduleTask()

一番ナイーブそうな PlanDistribution.COORDINATOR_ONLY (コーディネーターで処理) → SqlStageExecution.scheduleOnCurrentNode() へ。


コーディネータノード上でタスクを実行する scheduleOnCurrentNode() の本体。

nodeSelectorNodeScheduler.NodeSelector クラス。

// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L610
private void scheduleOnCurrentNode()
{
    // create task on current node
    Node node = nodeSelector.selectCurrentNode();
    scheduleTask(0, node);
    // tell sub stages about all nodes and that there will not be more nodes
    for (StageExecutionNode subStage : subStages.values()) {
        subStage.parentNodesAdded(ImmutableList.of(node), true);
    }
}

NodeScheduler.NodeSelector.selectCurrentNode() へ。


NodeScheduler.NodeSelector.selectCurrentNode() 本体。

nodeManagerNodeManager インターフェース。

// presto-main/src/main/java/com/facebook/presto/execution/NodeScheduler.java:L157
public Node selectCurrentNode()
{
    // TODO: this is a hack to force scheduling on the coordinator
    return nodeManager.getCurrentNode();
}

NodeManager インターフェースのバインディング。

InternalNodeManagerDiscoveryNodeManager あたりがバインドされている感じ。

// presto-server/src/main/java/com/facebook/presto/server/CoordinatorModule.java:L121
// node scheduler
binder.bind(InternalNodeManager.class).to(DiscoveryNodeManager.class).in(Scopes.SINGLETON);
binder.bind(NodeManager.class).to(Key.get(InternalNodeManager.class)).in(Scopes.SINGLETON);
bindConfig(binder).to(NodeSchedulerConfig.class);
binder.bind(NodeScheduler.class).in(Scopes.SINGLETON);
newExporter(binder).export(NodeScheduler.class).withGeneratedName();

少し深そうなので、今回はあまり立ち入らないことにして、それっぽいノード情報が返ってきているということにする。

ちなみに、 Node インターフェースはこんな感じで、エントリに関する情報を保持している。

// presto-spi/src/main/java/com/facebook/presto/spi/Node.java:L18
public interface Node
{
    HostAddress getHostAndPort();

    URI getHttpUri();

    String getNodeIdentifier();
}

というわけで SqlStageExecution を続けて読む。

自身を表す Node オブジェクトを使って、 scheduleTask を起動している。 第一引数の 0 はステージ内のタスクIDの模様。

その後、サブステージに対して parentNodesAdded() で通知イベントを飛ばしている感じ。

// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L610
private void scheduleOnCurrentNode()
{
    // create task on current node
    Node node = nodeSelector.selectCurrentNode();
    scheduleTask(0, node);
    // tell sub stages about all nodes and that there will not be more nodes
    for (StageExecutionNode subStage : subStages.values()) {
        subStage.parentNodesAdded(ImmutableList.of(node), true);
    }
}

scheduleTask() へ。


2引数の scheduleTask() から、4引数の scheduleTask() へ。

// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L722
private RemoteTask scheduleTask(int id, Node node)
{
    return scheduleTask(id, node, null, ImmutableList.<Split>of());
}

タスクの発行

scheduleTask() の先頭部分。 タスクの入出力をやっつけてそうな addNewExchangesAndBuffers() を先頭で起動している。

// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L727
private RemoteTask scheduleTask(int id, Node node, PlanNodeId sourceId, Iterable<? extends Split> sourceSplits)
{
    // before scheduling a new task update all existing tasks with new exchanges and output buffers
    addNewExchangesAndBuffers();

addNewExchangesAndBuffers() の先頭部分。

// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L791
private boolean addNewExchangesAndBuffers()
{
    // get new exchanges and update exchange state
    Set<PlanNodeId> completeSources = updateCompleteSources();
    boolean allSourceComplete = completeSources.containsAll(fragment.getSourceIds());
    Multimap<PlanNodeId, URI> newExchangeLocations = getNewExchangeLocations();
    exchangeLocations.set(ImmutableMultimap.<PlanNodeId, URI>builder()
            .putAll(exchangeLocations.get())
            .putAll(newExchangeLocations)
            .build());

ざっくりと概要。

  • updateCompleteSources() - 現在のステージの入力 ( ExchangeNode ) のうち、それを生成するステージの実行準備が整ったものの一覧を抽出
  • getNewExchangeLocations() - 現時点で把握していないサブステージのタスク出力一覧を抽出
  • exchangeLocations - getNewExchangeLocations() を追加している

updateCompleteSources() 本体。

  1. 未解決の ExchangeNode を探す
  2. そのデータを生成するステージ一覧を取りだす
  3. それらのステージがいずれも準備が整っていたら、 ExchangeNode にマークをつける
// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L853
private Set<PlanNodeId> updateCompleteSources()
{
    for (PlanNode planNode : fragment.getSources()) {
        if (!completeSources.contains(planNode.getId()) && planNode instanceof ExchangeNode) {
            ExchangeNode exchangeNode = (ExchangeNode) planNode;
            boolean exchangeFinished = true;
            for (PlanFragmentId planFragmentId : exchangeNode.getSourceFragmentIds()) {
                StageExecutionNode subStage = subStages.get(planFragmentId);
                switch (subStage.getState()) {
                    case PLANNED:
                    case SCHEDULING:
                        exchangeFinished = false;
                        break;
                }
            }
            if (exchangeFinished) {
                completeSources.add(planNode.getId());
            }
        }
    }
    return completeSources;
}

getNewExchangeLocations() 本体。

  1. すべての ExchangeNode を探す
  2. それぞれのデータを生成するステージ一覧を取りだす
  3. それぞれのタスク一覧を取り出す
  4. 未見のタスクがあれば、それらを返す
// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L465
private Multimap<PlanNodeId, URI> getNewExchangeLocations()
{
    Multimap<PlanNodeId, URI> exchangeLocations = this.exchangeLocations.get();
    ImmutableMultimap.Builder<PlanNodeId, URI> newExchangeLocations = ImmutableMultimap.builder();
    for (PlanNode planNode : fragment.getSources()) {
        if (planNode instanceof ExchangeNode) {
            ExchangeNode exchangeNode = (ExchangeNode) planNode;
            for (PlanFragmentId planFragmentId : exchangeNode.getSourceFragmentIds()) {
                StageExecutionNode subStage = subStages.get(planFragmentId);
                checkState(subStage != null, "Unknown sub stage %s, known stages %s", planFragmentId, subStages.keySet());
                // add new task locations
                for (URI taskLocation : subStage.getTaskLocations()) {
                    if (!exchangeLocations.containsEntry(exchangeNode.getId(), taskLocation)) {
                        newExchangeLocations.putAll(exchangeNode.getId(), taskLocation);
                    }
                }
            }
        }
    }
    return newExchangeLocations.build();
}

addNewExchangesAndBuffers() の残り部分は、タスクを発行していない今はあまり関係なさそうなので読み飛ばす。

// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L791
private boolean addNewExchangesAndBuffers()
{
    ...
    // get new output buffer and update output buffer state
    OutputBuffers outputBuffers = updateToNextOutputBuffers();
    // finished state must be decided before update to avoid race conditions
    boolean finished = allSourceComplete && outputBuffers.isNoMoreBufferIds();
    // update tasks
    for (RemoteTask task : tasks.values()) {
        ...
    }
    return finished;
}

scheduleTask() の続きの部分。

addNewExchangesAndBuffers() の結果も利用して、 initialSplits を構築している。

// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L727
private RemoteTask scheduleTask(int id, Node node, PlanNodeId sourceId, Iterable<? extends Split> sourceSplits)
{
    // before scheduling a new task update all existing tasks with new exchanges and output buffers
    addNewExchangesAndBuffers();
    TaskId taskId = new TaskId(stageId, String.valueOf(id));
    ImmutableMultimap.Builder<PlanNodeId, Split> initialSplits = ImmutableMultimap.builder();
    for (Split sourceSplit : sourceSplits) {
        initialSplits.put(sourceId, sourceSplit);
    }
    for (Entry<PlanNodeId, URI> entry : exchangeLocations.get().entries()) {
        initialSplits.put(entry.getKey(), createRemoteSplitFor(node.getNodeIdentifier(), entry.getValue()));
    }

軽く createRemoteSplitFor() を読んでみると、タスクのURIから出力データの位置を気合で計算しているように見える。

// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L960
private RemoteSplit createRemoteSplitFor(String nodeId, URI taskLocation)
{
    URI splitLocation = uriBuilderFrom(taskLocation).appendPath("results").appendPath(nodeId).build();
    return new RemoteSplit(splitLocation, tupleInfos);
}

さらに scheduleTask() の続きの部分。

RemoteTaskFactory.createRemoteTask() を利用してリモートタスクを作成し、RemoteTask.start() で実行を開始している感じ。

// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L727
private RemoteTask scheduleTask(int id, Node node, PlanNodeId sourceId, Iterable<? extends Split> sourceSplits)
{
    ...
    RemoteTask task = remoteTaskFactory.createRemoteTask(session,
            taskId,
            node,
            fragment,
            initialSplits.build(),
            outputReceivers,
            currentOutputBuffers);
    ...
    // create and update task
    task.start();
    // record this task
    tasks.put(node, task);

RemoteTaskFactory には、 HttpRemoteTaskFactory がバインディングされている。

ついでに、その付近で LocationFactory に対して HttpLocationFactory もバインディングされている。

// presto-server/src/main/java/com/facebook/presto/server/ServerMainModule.java:L149
// execution
binder.bind(LocationFactory.class).to(HttpLocationFactory.class).in(Scopes.SINGLETON);
binder.bind(RemoteTaskFactory.class).to(HttpRemoteTaskFactory.class).in(Scopes.SINGLETON);
httpClientBinder(binder).bindAsyncHttpClient("scheduler", ForScheduler.class).withTracing();

HttpRemoteTaskFactory.createRemoteTask() の本体。

HttpRemoteTask のコンストラクタを呼び出している。

このとき、 HttpLocationFactory.createTaskLocation() を使って、対象タスクのURIを算出している。

// presto-server/src/main/java/com/facebook/presto/server/HttpRemoteTaskFactory.java:L81
public RemoteTask createRemoteTask(Session session,
        TaskId taskId,
        com.facebook.presto.spi.Node node,
        PlanFragment fragment,
        Multimap<PlanNodeId, Split> initialSplits,
        Map<PlanNodeId, OutputReceiver> outputReceivers,
        OutputBuffers outputBuffers)
{
    return new HttpRemoteTask(session,
            taskId,
            node.getNodeIdentifier(),
            locationFactory.createTaskLocation(node, taskId),
            fragment,
            initialSplits,
            outputReceivers,
            outputBuffers,
            httpClient,
            executor,
            maxConsecutiveErrorCount,
            minErrorDuration,
            taskInfoCodec,
            taskUpdateRequestCodec
    );
}

HttpLocationFactory.createTaskLocation() の本体。

"/v1/task" というパスが接頭辞としてついている。

// presto-server/src/main/java/com/facebook/presto/server/HttpLocationFactory.java:L76
public URI createTaskLocation(Node node, TaskId taskId)
{
    Preconditions.checkNotNull(node, "node is null");
    Preconditions.checkNotNull(taskId, "taskId is null");
    return uriBuilderFrom(node.getHttpUri())
            .appendPath("/v1/task")
            .appendPath(taskId.toString())
            .build();
}

なお、対応するリソースは TaskResource の模様。

// presto-server/src/main/java/com/facebook/presto/server/TaskResource.java:L60
@Path("/v1/task")
public class TaskResource

HttpRemoteTask のコンストラクタ。

処理対象のスプリット情報 ( initialSplits ) から pendingSplits を作っていたり、 タスク情報 ( taskInfo ) に TaskState.PLANNED という状態のステートマシンを登録していたりする。

// presto-server/src/main/java/com/facebook/presto/server/HttpRemoteTask.java:L144
public HttpRemoteTask(Session session,
        ...
{
        ...
        for (Entry<PlanNodeId, Split> entry : checkNotNull(initialSplits, "initialSplits is null").entries()) {
            ScheduledSplit scheduledSplit = new ScheduledSplit(nextSplitId.getAndIncrement(), entry.getValue());
            pendingSplits.put(entry.getKey(), scheduledSplit);
        }
        ...
        TaskStats taskStats = new TaskContext(taskId, executor, session).getTaskStats();
        taskInfo = new StateMachine<>("task " + taskId, executor, new TaskInfo(
                taskId,
                TaskInfo.MIN_VERSION,
                TaskState.PLANNED,
                location,
                DateTime.now(),
                new SharedBufferInfo(QueueState.OPEN, 0, 0, bufferStates),
                ImmutableSet.<PlanNodeId>of(),
                taskStats,
                ImmutableList.<FailureInfo>of(),
                ImmutableMap.<PlanNodeId, Set<?>>of()));
    }
}

HttpRemoteTask.start() の本体。HttpRemoteTask.scheduleUpdate() を呼び出している。

scheduleUpdate() は様々なところから呼び出されている。

// presto-server/src/main/java/com/facebook/presto/server/HttpRemoteTask.java:L229
public void start()
{
    try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s", taskId)) {
        // to start we just need to trigger an update
        scheduleUpdate();
    }
}

HttpRemoteTask.scheduleUpdate() の本体。

リクエストオブジェクトを作成し、 /v1/task に投げている。

// presto-server/src/main/java/com/facebook/presto/server/HttpRemoteTask.java:L350
private synchronized void scheduleUpdate()
{
    ...
    TaskUpdateRequest updateRequest = new TaskUpdateRequest(session,
            planFragment,
            sources,
            outputBuffers.get());
    Request request = preparePost()
            .setUri(uriBuilderFrom(taskInfo.get().getSelf()).build())
            .setHeader(HttpHeaders.CONTENT_TYPE, MediaType.JSON_UTF_8.toString())
            .setBodyGenerator(jsonBodyGenerator(taskUpdateRequestCodec, updateRequest))
            .build();
    ...
}

おそらくこの辺りでタスクが起動するはず。


Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment