- presto 0.60
- [
QueryResource
,TaskResource
)
なんかそのまま mvn clean install eclipse:eclipse -DskipTests
コマンド叩いても checkstyle で落ちたので:
mvn clean install -DskipTests
mvn eclipse:eclipse -Dcheckstyle.skip
ちなみに、EclipseじゃなくてIDEA推奨の模様。
サービスエントリ。
リクエストを受け取る QueryResource
から。
// presto-server/src/main/java/com/facebook/presto/server/QueryResource.java:L89
@POST
@Produces(MediaType.APPLICATION_JSON)
public Response createQuery(String query,
@HeaderParam(PRESTO_USER) String user,
@HeaderParam(PRESTO_SOURCE) String source,
@HeaderParam(PRESTO_CATALOG) @DefaultValue(DEFAULT_CATALOG) String catalog,
@HeaderParam(PRESTO_SCHEMA) @DefaultValue(DEFAULT_SCHEMA) String schema,
@HeaderParam(USER_AGENT) String userAgent,
@Context HttpServletRequest requestContext,
@Context UriInfo uriInfo)
クエリ発行リクエストの処理。
// presto-server/src/main/java/com/facebook/presto/server/QueryResource.java:L106
QueryInfo queryInfo = queryManager.createQuery(new Session(user, source, catalog, schema, remoteUserAddress, userAgent), query);
QueryManager.createQuery()
を起動しているが、 QueryManager
はインターフェース。
QueryManager
のバインディング。
SqlQueryManager.createQuery()
へ。
// presto-server/src/main/java/com/facebook/presto/server/CoordinatorModule.java:L104
binder.bind(QueryManager.class).to(SqlQueryManager.class).in(Scopes.SINGLETON);
SqlQueryManager.createQuery()
の先頭。
// presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java:L191
public QueryInfo createQuery(Session session, String query)
{
checkNotNull(query, "query is null");
Preconditions.checkArgument(!query.isEmpty(), "query must not be empty string");
QueryId queryId = queryIdGenerator.createNextQueryId();
Statement statement;
try {
statement = SqlParser.createStatement(query);
SqlParser.createStatement()
でクエリ文字列をパース。
SqlParser.createStatement()
の本体。
// presto-parser/src/main/java/com/facebook/presto/sql/parser/SqlParser.java:L32
public static Statement createStatement(String sql)
{
try {
return createStatement(parseStatement(sql));
}
catch (StackOverflowError e) {
throw new ParsingException("statement is too large (stack overflow while parsing)");
}
}
CommonTree
はANTLRのジェネリックなASTモデル。
// presto-parser/src/main/java/com/facebook/presto/sql/parser/SqlParser.java:L78
static CommonTree parseStatement(String sql)
{
try {
return (CommonTree) getParser(sql).singleStatement().getTree();
}
catch (RecognitionException e) {
throw new AssertionError(e); // RecognitionException is not thrown
}
}
ANTLRの機構を使って2-passでASTモデル構築。
// presto-parser/src/main/java/com/facebook/presto/sql/parser/SqlParser.java:L53
static Statement createStatement(CommonTree tree)
{
TreeNodeStream stream = new BufferedTreeNodeStream(tree);
StatementBuilder builder = new StatementBuilder(stream);
try {
return builder.statement().value;
}
catch (RecognitionException e) {
throw new AssertionError(e); // RecognitionException is not thrown
}
}
statement
構文規則。
// presto-parser/src/main/antlr3/com/facebook/presto/sql/parser/StatementBuilder.g:L58
statement returns [Statement value]
: query { $value = $query.value; }
| explain { $value = $explain.value; }
| showTables { $value = $showTables.value; }
| showSchemas { $value = $showSchemas.value; }
| showCatalogs { $value = $showCatalogs.value; }
| showColumns { $value = $showColumns.value; }
| showPartitions { $value = $showPartitions.value; }
| showFunctions { $value = $showFunctions.value; }
| useCollection { $value = $useCollection.value; }
| createTable { $value = $createTable.value; }
| createMaterializedView { $value = $createMaterializedView.value; }
| refreshMaterializedView { $value = $refreshMaterializedView.value; }
| createAlias { $value = $createAlias.value; }
| dropAlias { $value = $dropAlias.value; }
| dropTable { $value = $dropTable.value; }
;
今回は query
を見る。
query
構文の .value
は Query
型。
// presto-parser/src/main/antlr3/com/facebook/presto/sql/parser/StatementBuilder.g:L76
query returns [Query value]
: ^(QUERY queryExpr) { $value = $queryExpr.value; }
;
Query
クラスのフィールド。
// presto-parser/src/main/java/com/facebook/presto/sql/tree/Query.java:L23
public class Query
extends Statement
{
private final Optional<With> with;
private final QueryBody queryBody;
private final List<SortItem> orderBy;
private final Optional<String> limit;
private final Optional<Approximate> approximate;
QueryBody
型の queryBody
がリレーションの計算部分。
QueryBody
は以下のサブクラスがある。
QuerySpecification
Except
Intersect
Union
Table
TableSubquery
SELECT ... FROM ... WHERE ...
は QuerySpecification
で表現。
FROM
句にテーブルを直接指定すると Table
で表現。
QueryExecution
の生成。
executionFactories : Map<Class<? extends Statement>, QueryExecutionFactory>
からファクトリを探す。
statement.getClass()
は Query
クラス。
// presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java:L206
QueryExecutionFactory<?> queryExecutionFactory = executionFactories.get(statement.getClass());
Preconditions.checkState(queryExecutionFactory != null, "Unsupported statement type %s", statement.getClass().getName());
final QueryExecution queryExecution = queryExecutionFactory.createQueryExecution(queryId, query, session, statement);
バインディングの解決。
Query
の場合は SqlQueryExecution
系がバインディングされている。
// presto-server/src/main/java/com/facebook/presto/server/CoordinatorModule.java:L152
MapBinder<Class<? extends Statement>, QueryExecution.QueryExecutionFactory<?>> executionBinder = newMapBinder(binder,
new TypeLiteral<Class<? extends Statement>>() {},
new TypeLiteral<QueryExecution.QueryExecutionFactory<?>>() {}
);
...
executionBinder.addBinding(Query.class).to(SqlQueryExecution.SqlQueryExecutionFactory.class).in(Scopes.SINGLETON);
いちおう SqlQueryExecutionFactory
をみておくと、 SqlQueryExecution
を作るだけ。
// presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java:L453
@Override
public SqlQueryExecution createQueryExecution(QueryId queryId, String query, Session session, Statement statement)
{
SqlQueryExecution queryExecution = new SqlQueryExecution(queryId, ...
queryExecutor
経由でクエリを実行。
// presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java:L227
// start the query in the background
queryExecutor.submit(new QueryStarter(queryExecution, stats));
queryExecutor
は普通のスレッドプール。
queryExecutor.submit(Runnable)
で run()
メソッドを起動するだけ。
// presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java:L101
this.queryExecutor = Executors.newCachedThreadPool(threadsNamed("query-scheduler-%d"));
QueryStarter implements Runnable
の本体。
// presto-main/src/main/java/com/facebook/presto/execution/SqlQueryManager.java:L378
public QueryStarter(QueryExecution queryExecution, SqlQueryManagerStats stats)
{
this.queryExecution = queryExecution;
this.stats = stats;
}
@Override
public void run()
{
try (SetThreadName setThreadName = new SetThreadName("Query-%s", queryExecution.getQueryInfo().getQueryId())) {
stats.queryStarted();
queryExecution.start();
}
}
QueryExecution.start()
を呼んでいるので、 SqlQueryExecution.start()
へ。
SqlQueryExecution.start()
の前半部分。
analyzeQuery()
メソッドで SubPlan
を作る。
// presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java:L151
// transition to planning
if (!stateMachine.beginPlanning()) {
// query already started or finished
return;
}
// analyze query
SubPlan subplan = analyzeQuery();
SubPlan
はサブプラン本体の PlanFragment
と、再帰的なサブプランをもっている感じ。
// presto-main/src/main/java/com/facebook/presto/sql/planner/SubPlan.java:L31
public class SubPlan
{
private final PlanFragment fragment;
private final List<SubPlan> children;
PlanFragment
でまず重要そうなのは、 PlanNode
のあたり。
// presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragment.java:L42
public class PlanFragment
{
...
private final PlanNode root;
...
private final List<PlanNode> sources;
PlanNode
は論理オペレータを表していて、たとえば以下のサブクラスがある。
TableScanNode
ProjectNode
SelectNode
JoinNode
- ...
それぞれも、オペレータの入力としてさらに PlanNode
を持っている感じ。
なお、以下のサブクラスは SubPlan
以降にしか出現しない。
SinkNode
-PlanFragment
の出力ExchangeNode
-SinkNode
による出力を入力にとる
analyzeQuery()
本体。
重要そうなのは:
Analyzer.analyze()
LogicalPlanner.plan()
DistributedLogicalPlanner.createSubPlans()
// presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java:L210
Analyzer analyzer = new Analyzer(stateMachine.getSession(), metadata, Optional.of(queryExplainer), approximateQueriesEnabled);
Analysis analysis = analyzer.analyze(statement);
PlanNodeIdAllocator idAllocator = new PlanNodeIdAllocator();
// plan query
LogicalPlanner logicalPlanner = new LogicalPlanner(stateMachine.getSession(), planOptimizers, idAllocator, metadata, periodicImportManager, storageManager);
Plan plan = logicalPlanner.plan(analysis);
List<Input> inputs = new InputExtractor(metadata).extract(plan.getRoot());
stateMachine.setInputs(inputs);
// fragment the plan
SubPlan subplan = new DistributedLogicalPlanner(metadata, idAllocator).createSubPlans(plan, false);
stateMachine.recordAnalysisTime(analysisStart);
return subplan;
なお、 LogicalPlanner.plan()
が生成する Plan
は、ステージ区切り前の論理実行計画。
ルートとして PlanNode
のツリーを持っている。
// presto-main/src/main/java/com/facebook/presto/sql/planner/Plan.java:L22
public class Plan
{
private final PlanNode root;
Analyzer.analyze()
本体。
名前表を作ったり、型の計算をしたりして、 Analysis
オブジェクトに積んでいく感じ。
戻り値の TupleDescriptor
は主に結果リレーションの型情報。
// presto-main/src/main/java/com/facebook/presto/sql/analyzer/Analyzer.java:L50
Analysis analysis = new Analysis();
StatementAnalyzer analyzer = new StatementAnalyzer(analysis, metadata, session, approximateQueriesEnabled, queryExplainer);
TupleDescriptor outputDescriptor = analyzer.process(statement, new AnalysisContext());
analysis.setOutputDescriptor(outputDescriptor);
return analysis;
StatementAnalyzer extends AstVisitor
なので、 visitQuery()
へ。
StatementAnalyzer.visitQuery()
の一部を抜粋。
Query.getQueryBody() : QueryBody
は、 QuerySpecification
として進める。
// presto-main/src/main/java/com/facebook/presto/sql/analyzer/StatementAnalyzer.java:L504
TupleAnalyzer analyzer = new TupleAnalyzer(analysis, session, metadata, approximateQueriesEnabled);
TupleDescriptor descriptor = analyzer.process(node.getQueryBody(), context);
TupleAnalyzer extends AstVisitor
なので、 analyzer.process()
は TupleAnalyzer.visitQuerySpecification()
へ。
TupleAnalyzer.visitQuerySpecification()
の本体。
ざっくり言えば、 FROM
句で入力した型から、 SELECT
句で出力する型を計算している感じ。
// presto-main/src/main/java/com/facebook/presto/sql/analyzer/TupleAnalyzer.java:L258
TupleDescriptor tupleDescriptor = analyzeFrom(node, context);
analyzeWhere(node, tupleDescriptor, context);
List<FieldOrExpression> outputExpressions = analyzeSelect(node, tupleDescriptor, context);
List<FieldOrExpression> groupByExpressions = analyzeGroupBy(node, tupleDescriptor, context, outputExpressions);
List<FieldOrExpression> orderByExpressions = analyzeOrderBy(node, tupleDescriptor, context, outputExpressions);
analyzeHaving(node, tupleDescriptor, context);
analyzeAggregations(node, tupleDescriptor, groupByExpressions, outputExpressions, orderByExpressions);
analyzeWindowFunctions(node, outputExpressions, orderByExpressions);
TupleDescriptor descriptor = computeOutputDescriptor(node, tupleDescriptor);
analysis.setOutputDescriptor(node, descriptor);
LogicalPlanner.plan()
本体。
ざっくり:
RelationPlanner.process()
でクエリ全体の論理オペレータツリーを作成createOutputPlan()
で論理オペレータツリーの最後に出力用のオペレータを接続PlanOptimizer.optimize()
で最適化Plan
オブジェクトで包んで返す
// presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java:L92
public Plan plan(Analysis analysis)
{
RelationPlan plan;
...
else {
RelationPlanner planner = new RelationPlanner(analysis, symbolAllocator, idAllocator, metadata, session);
plan = planner.process(analysis.getQuery(), null);
}
PlanNode root = createOutputPlan(plan, analysis);
...
for (PlanOptimizer optimizer : planOptimizers) {
root = optimizer.optimize(root, session, symbolAllocator.getTypes(), symbolAllocator, idAllocator);
}
...
return new Plan(root, symbolAllocator);
}
RelationPlanner extends AstVisitor
なので、 RelationPlanner.visitQuery()
へ。
RelationPlanner.visitQuery()
本体。
さらに QueryPlanner.visitQuery()
へ。
// presto-main/src/main/java/com/facebook/presto/sql/planner/RelationPlanner.java:L215
protected RelationPlan visitQuery(Query node, Void context)
{
PlanBuilder subPlan = new QueryPlanner(analysis, symbolAllocator, idAllocator, metadata, session).process(node, null);
ImmutableList.Builder<Symbol> outputSymbols = ImmutableList.builder();
for (FieldOrExpression fieldOrExpression : analysis.getOutputExpressions(node)) {
outputSymbols.add(subPlan.translate(fieldOrExpression));
}
return new RelationPlan(subPlan.getRoot(), analysis.getOutputDescriptor(node), outputSymbols.build());
}
QueryPlanner.visitQuery()
本体。
おそらく、 QuerySpecification (<: QueryBody)
) を論理オペレーターに変換した後、
IN
, ORDER BY
, LIMIT
, SELECT
句あたりを適当に処理するのではないかと。
// presto-main/src/main/java/com/facebook/presto/sql/planner/QueryPlanner.java:L103
protected PlanBuilder visitQuery(Query query, Void context)
{
PlanBuilder builder = planQueryBody(query);
Set<InPredicate> inPredicates = analysis.getInPredicates(query);
builder = appendSemiJoins(builder, inPredicates);
List<FieldOrExpression> orderBy = analysis.getOrderByExpressions(query);
List<FieldOrExpression> outputs = analysis.getOutputExpressions(query);
builder = project(builder, Iterables.concat(orderBy, outputs));
builder = sort(builder, query);
builder = project(builder, analysis.getOutputExpressions(query));
builder = limit(builder, query);
return builder;
}
省略して RelationPlanner.visitQuerySpecification()
へ。
QueryPlanner.visitQuerySpecification()
本体。
FROM
, IN
, WHERE
などを順番に適用していっている感じ。
// presto-main/src/main/java/com/facebook/presto/sql/planner/QueryPlanner.java:L121
protected PlanBuilder visitQuerySpecification(QuerySpecification node, Void context)
{
PlanBuilder builder = planFrom(node);
Set<InPredicate> inPredicates = analysis.getInPredicates(node);
builder = appendSemiJoins(builder, inPredicates);
builder = filter(builder, analysis.getWhere(node));
builder = aggregate(builder, node);
builder = filter(builder, analysis.getHaving(node));
builder = window(builder, node);
List<FieldOrExpression> orderBy = analysis.getOrderByExpressions(node);
List<FieldOrExpression> outputs = analysis.getOutputExpressions(node);
builder = project(builder, Iterables.concat(orderBy, outputs));
builder = distinct(builder, node, outputs, orderBy);
builder = sort(builder, node);
builder = project(builder, analysis.getOutputExpressions(node));
builder = limit(builder, node);
return builder;
}
QueryPlanner.planFrom()
だけ読んでみる。
QueryPlanner.planFrom()
本体。
// presto-main/src/main/java/com/facebook/presto/sql/planner/QueryPlanner.java:L160
private PlanBuilder planFrom(QuerySpecification node)
{
RelationPlan relationPlan;
...
else {
relationPlan = new RelationPlanner(analysis, symbolAllocator, idAllocator, metadata, session)
.process(Iterables.getOnlyElement(node.getFrom()), null);
}
...
return new PlanBuilder(translations, relationPlan.getRoot());
}
FROM
句に指定されたリレーションを再帰的に RelationPlanner
で変換している。
テーブルの場合を見るということで、 RelationPlanner.visitTable()
へ。
RelationPlanner.visitTable()
本体。
かなり省略すると、テーブルに対応するハンドルを探して、 TableScanNode
というオペレータを作る。
さらに、サンプリングの設定がなされていたら、テーブルスキャン結果を MaterializeSampleNode
で包む。
// presto-main/src/main/java/com/facebook/presto/sql/planner/RelationPlanner.java:L92
protected RelationPlan visitTable(Table node, Void context)
{
...
TableHandle handle = analysis.getTableHandle(node);
...
PlanNode root = new TableScanNode(idAllocator.getNextId(), handle, nodeOutputSymbols, columns.build(), null, Optional.<GeneratedPartitions>absent());
if (sampleWeightSymbol != null) {
root = new MaterializeSampleNode(idAllocator.getNextId(), root, sampleWeightSymbol);
}
return new RelationPlan(root, descriptor, planOutputSymbols);
}
LogicalPlanner.plan()
まで戻って、 PlanOptimizer
について少しだけ。
// presto-main/src/main/java/com/facebook/presto/sql/planner/LogicalPlanner.java:L92
public Plan plan(Analysis analysis)
{
RelationPlan plan;
...
for (PlanOptimizer optimizer : planOptimizers) {
root = optimizer.optimize(root, session, symbolAllocator.getTypes(), symbolAllocator, idAllocator);
}
...
return new Plan(root, symbolAllocator);
}
PlanOptimizersFactory
のあたりで PlanOptimizer
を正しい順序で登録している。
適用順序が超重要なので、コメントが長め。
// presto-main/src/main/java/com/facebook/presto/sql/planner/PlanOptimizersFactory.java:L57
builder.add(new ImplementSampleAsFilter(),
new SimplifyExpressions(metadata),
new UnaliasSymbolReferences(),
new PruneRedundantProjections(),
new SetFlatteningOptimizer(),
new MaterializeSamplePullUp(),
new LimitPushDown(), // Run the LimitPushDown after flattening set operators to make it easier to do the set flattening
new PredicatePushDown(metadata, splitManager, analyzerConfig.isApproximateQueriesEnabled()),
new PredicatePushDown(metadata, splitManager, analyzerConfig.isApproximateQueriesEnabled()), // Run predicate push down one more time in case we can leverage new information from generated partitions
new MergeProjections(),
new SimplifyExpressions(metadata), // Re-run the SimplifyExpressions to simplify any recomposed expressions from other optimizations
new UnaliasSymbolReferences(), // Run again because predicate pushdown might add more projections
new PruneUnreferencedOutputs(), // Make sure to run this at the end to help clean the plan for logging/execution and not remove info that other optimizers might need at an earlier point
new PruneRedundantProjections()); // This MUST run after PruneUnreferencedOutputs as it may introduce new redundant projections
論理実行計画はこの辺りで。
DistributedLogicalPlanner.createSubPlans()
本体。
DistributedLogicalPlanner.Visitor (extends PlanVisitor)
に処理を移譲し、 SubPlanBuilder.build()
で SubPlan
を作る。
// presto-main/src/main/java/com/facebook/presto/sql/planner/DistributedLogicalPlanner.java:L82
public SubPlan createSubPlans(Plan plan, boolean createSingleNodePlan)
{
Visitor visitor = new Visitor(plan.getSymbolAllocator(), createSingleNodePlan);
SubPlanBuilder builder = plan.getRoot().accept(visitor, null);
SubPlan subplan = builder.build();
subplan.sanityCheck();
return subplan;
}
ざっくり言うと:
PlanNode
の木をいくつかの部分木に分解する- 部分木に分解した際の切断面のうち、根に
SinkNode
を配置する → 別のサブプランにデータを渡す - 部分木に分解した際の切断面のうち、末端に
ExchangeNode
を配置する → 別のサブプランからデータを受け取る - それぞれの部分木を
SubPlan
とする
という感じ。
それぞれの PlanNode
に対し、対応する DistributedLogicalPlanner.Visitor.visit*()
で上記を行っていく。
ただし、引数の createSingleNodePlan
が true
の場合、 SinkNode
や ExchangeNode
は生成されない模様。
分散処理にあたって PlanFragment
をもういちど。
// presto-main/src/main/java/com/facebook/presto/sql/planner/PlanFragment.java:L42
public class PlanFragment
{
public enum PlanDistribution
{
NONE,
FIXED,
SOURCE,
COORDINATOR_ONLY
}
public static enum OutputPartitioning
{
NONE,
HASH
}
...
private final PlanDistribution distribution;
...
private final OutputPartitioning outputPartitioning;
private final List<Symbol> partitionBy;
今のところの理解では、 PlanDistribution
は処理を分散する際の制約。 OutputPartitioning
は SinkNode
が出力する内容のパーティショニング方法。
PlanDistribution
:
NONE
- 1台のノードで実行する (OutputNode
,TopNNode
,LimitNode
, グループ化なしのAggregationNode
など)FIXED
- N台のノードで実行する (MarkDistinctNode
,WindowNode
, グループ化ありのAggregationNode
など )SOURCE
- データのあるノードで実行する (TableScanNode
のみ )COORDINATOR_ONLY
- コーディネータノードのみで実行する (TableCommitNode
のみ )
※上記以外の PlanDistribution
はないため、データは小さめでCPUだけぶん回すような計算はあまり想定していないか
OutputPartitioning
:
NONE
- 適当に出力HASH
- 特定の属性でパーティション化して出力
TableScanNode
の例。
ノード自身からなるサブプランを新しく作成している。
// presto-main/src/main/java/com/facebook/presto/sql/planner/DistributedLogicalPlanner.java:L353
public SubPlanBuilder visitTableScan(TableScanNode node, Void context)
{
return createSourceDistributionPlan(node, node.getId());
}
FilterNode
の例。
サブプランの区切りは特に出現しない。
// presto-main/src/main/java/com/facebook/presto/sql/planner/DistributedLogicalPlanner.java:L228
public SubPlanBuilder visitFilter(FilterNode node, Void context)
{
SubPlanBuilder current = node.getSource().accept(this, context);
current.setRoot(new FilterNode(node.getId(), current.getRoot(), node.getPredicate()));
return current;
}
JoinNode
の例。
SinkNode
と ExchangeNode
を作って、 SubPlanBuilder.build()
を実行している。
// presto-main/src/main/java/com/facebook/presto/sql/planner/DistributedLogicalPlanner.java:L436
public SubPlanBuilder visitJoin(JoinNode node, Void context)
{
SubPlanBuilder left = node.getLeft().accept(this, context);
SubPlanBuilder right = node.getRight().accept(this, context);
if (left.isDistributed() || right.isDistributed()) {
switch (node.getType()) {
case INNER:
case LEFT:
right.setRoot(new SinkNode(idAllocator.getNextId(), right.getRoot(), right.getRoot().getOutputSymbols()));
left.setRoot(new JoinNode(node.getId(),
node.getType(),
left.getRoot(),
new ExchangeNode(idAllocator.getNextId(), right.getId(), right.getRoot().getOutputSymbols()),
node.getCriteria()));
left.addChild(right.build());
return left;
これを見る限り、内部結合でも左と右が重要?
あと、 CROSS
は case
に含まれていない模様。
PredicatePushDown.Rewriter.rewriteJoin()
あたりで、JOIN ... ON 0 = 0
にしている。
OutputNode
の例。
出力対象の node.getSource()
までを SinkNode
につなげてサブプランを区切りつつ、
ExchangeNode
で受け取って OutputNode
で出力するところは、 DistributionPlan.NONE
で単一ノードにかき集めている。
// presto-main/src/main/java/com/facebook/presto/sql/planner/DistributedLogicalPlanner.java:L298
public SubPlanBuilder visitOutput(OutputNode node, Void context)
{
SubPlanBuilder current = node.getSource().accept(this, context);
if (current.isDistributed()) {
current.setRoot(new SinkNode(idAllocator.getNextId(), current.getRoot(), current.getRoot().getOutputSymbols()));
// create a new non-partitioned fragment
current = createSingleNodePlan(new ExchangeNode(idAllocator.getNextId(), current.getId(), current.getRoot().getOutputSymbols()))
.addChild(current.build());
}
current.setRoot(new OutputNode(node.getId(), current.getRoot(), node.getColumnNames(), node.getOutputSymbols()));
return current;
}
SqlQueryExecution.planDistribution()
に戻ってくる。
// presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java:L228
private void planDistribution(SubPlan subplan)
{
// time distribution planning
long distributedPlanningStart = System.nanoTime();
// plan the execution on the active nodes
DistributedExecutionPlanner distributedPlanner = new DistributedExecutionPlanner(splitManager, shardManager);
StageExecutionPlan outputStageExecutionPlan = distributedPlanner.plan(subplan);
DistributedExecutionPlanner.plan()
へ。
DistributedExecutionPlanner.plan()
の前半。
DistributedExecutionPlanner.Visitor extends PlanVisitor
で、
自身の PlanFragment
内にテーブルがあれば、それを SplitSource
として記録している。
// presto-main/src/main/java/com/facebook/presto/sql/planner/DistributedExecutionPlanner.java:L79
public StageExecutionPlan plan(SubPlan root, Predicate<Partition> materializedViewPartitionPredicate)
{
PlanFragment currentFragment = root.getFragment();
// get splits for this fragment, this is lazy so split assignments aren't actually calculated here
Visitor visitor = new Visitor();
NodeSplits nodeSplits = currentFragment.getRoot().accept(visitor, materializedViewPartitionPredicate);
見どころだけ書くと:
PlanFragment
の末端がExchangeNode
の場合、SplitSource
なしの扱いJoinNode
の入力が二つともSplitSource
を持つことはできないTableScanNode
とSampleNode
が同時にあって、かつサンプルにベルヌーイ分布を使っていない場合、スプリットごと間引いている
DistributedExecutionPlanner.plan()
の後半。
SubPlan
の子要素についても、再帰的に DistributedExecutionPlanner.plan()
を実行している。
自身の PlanFragment
と、子要素から StageExecutionPlan
を構築。
// presto-main/src/main/java/com/facebook/presto/sql/planner/DistributedExecutionPlanner.java:L79
public StageExecutionPlan plan(SubPlan root, Predicate<Partition> materializedViewPartitionPredicate)
{
...
for (SubPlan childPlan : root.getChildren()) {
dependencies.add(plan(childPlan, materializedViewPartitionPredicate));
}
return new StageExecutionPlan(currentFragment,
nodeSplits.getDataSource(),
dependencies.build(),
visitor.getOutputReceivers());
}
SqlQueryExecution.planDistribution()
の続きでは、根の StageExecutionPlan
から SqlStageExecution
を作っている。
根の StageExecutionPlan
は子の StageExecutionPlan
も含んでいるため、 SqlStageExecution
はクエリ全体のプラン実行を表している。
// presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java:L233
// plan the execution on the active nodes
DistributedExecutionPlanner distributedPlanner = new DistributedExecutionPlanner(splitManager, shardManager);
StageExecutionPlan outputStageExecutionPlan = distributedPlanner.plan(subplan);
...
// build the stage execution objects (this doesn't schedule execution)
SqlStageExecution outputStage = new SqlStageExecution(stateMachine.getQueryId(),
locationFactory,
outputStageExecutionPlan,
nodeScheduler,
remoteTaskFactory,
stateMachine.getSession(),
scheduleSplitBatchSize,
maxPendingSplitsPerNode,
initialHashPartitions,
queryExecutor,
ROOT_OUTPUT_BUFFERS);
this.outputStage.set(outputStage);
この結果を SqlQueryExecution.outputStage
に保持させている。
これは後でキャンセルしたり、情報を取り出したりする際にも使う模様。
SqlQueryExecution.start()
の最後の部分で、 SqlStageExecution.start()
を実行している。
// presto-main/src/main/java/com/facebook/presto/execution/SqlQueryExecution.java:L147
public void start()
{
...
// plan distribution of query
planDistribution(subplan);
...
SqlStageExecution stage = outputStage.get();
if (!stateMachine.isDone()) {
stage.start();
}
SqlStageExecution
のコンストラクタ入口。
ルートステージは parent
が null
に。
// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L174
private SqlStageExecution(@Nullable StageExecutionNode parent,
QueryId queryId,
AtomicInteger nextStageId,
LocationFactory locationFactory,
StageExecutionPlan plan,
NodeScheduler nodeScheduler,
RemoteTaskFactory remoteTaskFactory,
Session session,
int splitBatchSize,
int maxPendingSplitsPerNode,
int initialHashPartitions,
ExecutorService executor)
SqlStageExecution
のコンストラクタ内で、再帰的にサブステージに関するも作っている。
// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L213
ImmutableMap.Builder<PlanFragmentId, StageExecutionNode> subStages = ImmutableMap.builder();
for (StageExecutionPlan subStagePlan : plan.getSubStages()) {
PlanFragmentId subStageFragmentId = subStagePlan.getFragment().getId();
StageExecutionNode subStage = new SqlStageExecution(this,
queryId,
nextStageId,
locationFactory,
subStagePlan,
nodeScheduler,
remoteTaskFactory,
session,
splitBatchSize,
maxPendingSplitsPerNode,
initialHashPartitions,
executor);
subStage.addStateChangeListener(new StateChangeListener<StageInfo>()
{
@Override
public void stateChanged(StageInfo stageInfo)
{
doUpdateState();
}
});
subStages.put(subStageFragmentId, subStage);
}
this.subStages = subStages.build();
SqlStageExecution.start()
のエントリ。
// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L508
public Future<?> start()
{
try (SetThreadName setThreadName = new SetThreadName("Stage-%s", stageId)) {
return scheduleStartTasks();
}
}
SqlStageExecution.scheduleStartTasks()
へ。
SqlStageExecution.scheduleStartTasks()
の本体。
サブステージの SqlStageExecution.scheduleStartTasks()
を再帰的に実行したあと、
別スレッドで自身の SqlStageExecution.startTasks()
を実行。
// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L517
public Future<?> scheduleStartTasks()
{
try (SetThreadName setThreadName = new SetThreadName("Stage-%s", stageId)) {
// start sub-stages (starts bottom-up)
for (StageExecutionNode subStage : subStages.values()) {
subStage.scheduleStartTasks();
}
return executor.submit(new Runnable()
{
@Override
public void run()
{
startTasks();
}
});
}
}
SqlStageExecution.startTasks()
の本体。
SubPlan
の作成時に解析した PlanDistribution
の種類に従って、タスクに関する schedule*()
を起動している。
// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L535
private void startTasks()
{
...
// schedule tasks
if (fragment.getDistribution() == PlanDistribution.NONE) {
scheduleFixedNodeCount(1);
}
else if (fragment.getDistribution() == PlanDistribution.FIXED) {
scheduleFixedNodeCount(initialHashPartitions);
}
else if (fragment.getDistribution() == PlanDistribution.SOURCE) {
scheduleSourcePartitionedNodes();
}
else if (fragment.getDistribution() == PlanDistribution.COORDINATOR_ONLY) {
scheduleOnCurrentNode();
}
先に斜め読みしておくと、以下のように scheduleTask()
に辿り着く模様。
scheduleFixedNodeCount()
→scheduleTask()
scheduleSourcePartitionedNodes()
→assignSplits()
→scheduleTask()
scheduleOnCurrentNode()
→scheduleTask()
一番ナイーブそうな PlanDistribution.COORDINATOR_ONLY
(コーディネーターで処理)
→ SqlStageExecution.scheduleOnCurrentNode()
へ。
コーディネータノード上でタスクを実行する scheduleOnCurrentNode()
の本体。
nodeSelector
は NodeScheduler.NodeSelector
クラス。
// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L610
private void scheduleOnCurrentNode()
{
// create task on current node
Node node = nodeSelector.selectCurrentNode();
scheduleTask(0, node);
// tell sub stages about all nodes and that there will not be more nodes
for (StageExecutionNode subStage : subStages.values()) {
subStage.parentNodesAdded(ImmutableList.of(node), true);
}
}
NodeScheduler.NodeSelector.selectCurrentNode()
へ。
NodeScheduler.NodeSelector.selectCurrentNode()
本体。
nodeManager
は NodeManager
インターフェース。
// presto-main/src/main/java/com/facebook/presto/execution/NodeScheduler.java:L157
public Node selectCurrentNode()
{
// TODO: this is a hack to force scheduling on the coordinator
return nodeManager.getCurrentNode();
}
NodeManager
インターフェースのバインディング。
InternalNodeManager
→ DiscoveryNodeManager
あたりがバインドされている感じ。
// presto-server/src/main/java/com/facebook/presto/server/CoordinatorModule.java:L121
// node scheduler
binder.bind(InternalNodeManager.class).to(DiscoveryNodeManager.class).in(Scopes.SINGLETON);
binder.bind(NodeManager.class).to(Key.get(InternalNodeManager.class)).in(Scopes.SINGLETON);
bindConfig(binder).to(NodeSchedulerConfig.class);
binder.bind(NodeScheduler.class).in(Scopes.SINGLETON);
newExporter(binder).export(NodeScheduler.class).withGeneratedName();
少し深そうなので、今回はあまり立ち入らないことにして、それっぽいノード情報が返ってきているということにする。
ちなみに、 Node
インターフェースはこんな感じで、エントリに関する情報を保持している。
// presto-spi/src/main/java/com/facebook/presto/spi/Node.java:L18
public interface Node
{
HostAddress getHostAndPort();
URI getHttpUri();
String getNodeIdentifier();
}
というわけで SqlStageExecution
を続けて読む。
自身を表す Node
オブジェクトを使って、 scheduleTask
を起動している。
第一引数の 0
はステージ内のタスクIDの模様。
その後、サブステージに対して parentNodesAdded()
で通知イベントを飛ばしている感じ。
// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L610
private void scheduleOnCurrentNode()
{
// create task on current node
Node node = nodeSelector.selectCurrentNode();
scheduleTask(0, node);
// tell sub stages about all nodes and that there will not be more nodes
for (StageExecutionNode subStage : subStages.values()) {
subStage.parentNodesAdded(ImmutableList.of(node), true);
}
}
scheduleTask()
へ。
2引数の scheduleTask()
から、4引数の scheduleTask()
へ。
// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L722
private RemoteTask scheduleTask(int id, Node node)
{
return scheduleTask(id, node, null, ImmutableList.<Split>of());
}
scheduleTask()
の先頭部分。
タスクの入出力をやっつけてそうな addNewExchangesAndBuffers()
を先頭で起動している。
// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L727
private RemoteTask scheduleTask(int id, Node node, PlanNodeId sourceId, Iterable<? extends Split> sourceSplits)
{
// before scheduling a new task update all existing tasks with new exchanges and output buffers
addNewExchangesAndBuffers();
addNewExchangesAndBuffers()
の先頭部分。
// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L791
private boolean addNewExchangesAndBuffers()
{
// get new exchanges and update exchange state
Set<PlanNodeId> completeSources = updateCompleteSources();
boolean allSourceComplete = completeSources.containsAll(fragment.getSourceIds());
Multimap<PlanNodeId, URI> newExchangeLocations = getNewExchangeLocations();
exchangeLocations.set(ImmutableMultimap.<PlanNodeId, URI>builder()
.putAll(exchangeLocations.get())
.putAll(newExchangeLocations)
.build());
ざっくりと概要。
updateCompleteSources()
- 現在のステージの入力 (ExchangeNode
) のうち、それを生成するステージの実行準備が整ったものの一覧を抽出getNewExchangeLocations()
- 現時点で把握していないサブステージのタスク出力一覧を抽出exchangeLocations
-getNewExchangeLocations()
を追加している
updateCompleteSources()
本体。
- 未解決の
ExchangeNode
を探す - そのデータを生成するステージ一覧を取りだす
- それらのステージがいずれも準備が整っていたら、
ExchangeNode
にマークをつける
// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L853
private Set<PlanNodeId> updateCompleteSources()
{
for (PlanNode planNode : fragment.getSources()) {
if (!completeSources.contains(planNode.getId()) && planNode instanceof ExchangeNode) {
ExchangeNode exchangeNode = (ExchangeNode) planNode;
boolean exchangeFinished = true;
for (PlanFragmentId planFragmentId : exchangeNode.getSourceFragmentIds()) {
StageExecutionNode subStage = subStages.get(planFragmentId);
switch (subStage.getState()) {
case PLANNED:
case SCHEDULING:
exchangeFinished = false;
break;
}
}
if (exchangeFinished) {
completeSources.add(planNode.getId());
}
}
}
return completeSources;
}
getNewExchangeLocations()
本体。
- すべての
ExchangeNode
を探す - それぞれのデータを生成するステージ一覧を取りだす
- それぞれのタスク一覧を取り出す
- 未見のタスクがあれば、それらを返す
// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L465
private Multimap<PlanNodeId, URI> getNewExchangeLocations()
{
Multimap<PlanNodeId, URI> exchangeLocations = this.exchangeLocations.get();
ImmutableMultimap.Builder<PlanNodeId, URI> newExchangeLocations = ImmutableMultimap.builder();
for (PlanNode planNode : fragment.getSources()) {
if (planNode instanceof ExchangeNode) {
ExchangeNode exchangeNode = (ExchangeNode) planNode;
for (PlanFragmentId planFragmentId : exchangeNode.getSourceFragmentIds()) {
StageExecutionNode subStage = subStages.get(planFragmentId);
checkState(subStage != null, "Unknown sub stage %s, known stages %s", planFragmentId, subStages.keySet());
// add new task locations
for (URI taskLocation : subStage.getTaskLocations()) {
if (!exchangeLocations.containsEntry(exchangeNode.getId(), taskLocation)) {
newExchangeLocations.putAll(exchangeNode.getId(), taskLocation);
}
}
}
}
}
return newExchangeLocations.build();
}
addNewExchangesAndBuffers()
の残り部分は、タスクを発行していない今はあまり関係なさそうなので読み飛ばす。
// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L791
private boolean addNewExchangesAndBuffers()
{
...
// get new output buffer and update output buffer state
OutputBuffers outputBuffers = updateToNextOutputBuffers();
// finished state must be decided before update to avoid race conditions
boolean finished = allSourceComplete && outputBuffers.isNoMoreBufferIds();
// update tasks
for (RemoteTask task : tasks.values()) {
...
}
return finished;
}
scheduleTask()
の続きの部分。
addNewExchangesAndBuffers()
の結果も利用して、 initialSplits
を構築している。
// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L727
private RemoteTask scheduleTask(int id, Node node, PlanNodeId sourceId, Iterable<? extends Split> sourceSplits)
{
// before scheduling a new task update all existing tasks with new exchanges and output buffers
addNewExchangesAndBuffers();
TaskId taskId = new TaskId(stageId, String.valueOf(id));
ImmutableMultimap.Builder<PlanNodeId, Split> initialSplits = ImmutableMultimap.builder();
for (Split sourceSplit : sourceSplits) {
initialSplits.put(sourceId, sourceSplit);
}
for (Entry<PlanNodeId, URI> entry : exchangeLocations.get().entries()) {
initialSplits.put(entry.getKey(), createRemoteSplitFor(node.getNodeIdentifier(), entry.getValue()));
}
軽く createRemoteSplitFor()
を読んでみると、タスクのURIから出力データの位置を気合で計算しているように見える。
// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L960
private RemoteSplit createRemoteSplitFor(String nodeId, URI taskLocation)
{
URI splitLocation = uriBuilderFrom(taskLocation).appendPath("results").appendPath(nodeId).build();
return new RemoteSplit(splitLocation, tupleInfos);
}
さらに scheduleTask()
の続きの部分。
RemoteTaskFactory.createRemoteTask()
を利用してリモートタスクを作成し、RemoteTask.start()
で実行を開始している感じ。
// presto-main/src/main/java/com/facebook/presto/execution/SqlStageExecution.java:L727
private RemoteTask scheduleTask(int id, Node node, PlanNodeId sourceId, Iterable<? extends Split> sourceSplits)
{
...
RemoteTask task = remoteTaskFactory.createRemoteTask(session,
taskId,
node,
fragment,
initialSplits.build(),
outputReceivers,
currentOutputBuffers);
...
// create and update task
task.start();
// record this task
tasks.put(node, task);
RemoteTaskFactory
には、 HttpRemoteTaskFactory
がバインディングされている。
ついでに、その付近で LocationFactory
に対して HttpLocationFactory
もバインディングされている。
// presto-server/src/main/java/com/facebook/presto/server/ServerMainModule.java:L149
// execution
binder.bind(LocationFactory.class).to(HttpLocationFactory.class).in(Scopes.SINGLETON);
binder.bind(RemoteTaskFactory.class).to(HttpRemoteTaskFactory.class).in(Scopes.SINGLETON);
httpClientBinder(binder).bindAsyncHttpClient("scheduler", ForScheduler.class).withTracing();
HttpRemoteTaskFactory.createRemoteTask()
の本体。
HttpRemoteTask
のコンストラクタを呼び出している。
このとき、 HttpLocationFactory.createTaskLocation()
を使って、対象タスクのURIを算出している。
// presto-server/src/main/java/com/facebook/presto/server/HttpRemoteTaskFactory.java:L81
public RemoteTask createRemoteTask(Session session,
TaskId taskId,
com.facebook.presto.spi.Node node,
PlanFragment fragment,
Multimap<PlanNodeId, Split> initialSplits,
Map<PlanNodeId, OutputReceiver> outputReceivers,
OutputBuffers outputBuffers)
{
return new HttpRemoteTask(session,
taskId,
node.getNodeIdentifier(),
locationFactory.createTaskLocation(node, taskId),
fragment,
initialSplits,
outputReceivers,
outputBuffers,
httpClient,
executor,
maxConsecutiveErrorCount,
minErrorDuration,
taskInfoCodec,
taskUpdateRequestCodec
);
}
HttpLocationFactory.createTaskLocation()
の本体。
"/v1/task"
というパスが接頭辞としてついている。
// presto-server/src/main/java/com/facebook/presto/server/HttpLocationFactory.java:L76
public URI createTaskLocation(Node node, TaskId taskId)
{
Preconditions.checkNotNull(node, "node is null");
Preconditions.checkNotNull(taskId, "taskId is null");
return uriBuilderFrom(node.getHttpUri())
.appendPath("/v1/task")
.appendPath(taskId.toString())
.build();
}
なお、対応するリソースは TaskResource
の模様。
// presto-server/src/main/java/com/facebook/presto/server/TaskResource.java:L60
@Path("/v1/task")
public class TaskResource
HttpRemoteTask
のコンストラクタ。
処理対象のスプリット情報 ( initialSplits
) から pendingSplits
を作っていたり、
タスク情報 ( taskInfo
) に TaskState.PLANNED
という状態のステートマシンを登録していたりする。
// presto-server/src/main/java/com/facebook/presto/server/HttpRemoteTask.java:L144
public HttpRemoteTask(Session session,
...
{
...
for (Entry<PlanNodeId, Split> entry : checkNotNull(initialSplits, "initialSplits is null").entries()) {
ScheduledSplit scheduledSplit = new ScheduledSplit(nextSplitId.getAndIncrement(), entry.getValue());
pendingSplits.put(entry.getKey(), scheduledSplit);
}
...
TaskStats taskStats = new TaskContext(taskId, executor, session).getTaskStats();
taskInfo = new StateMachine<>("task " + taskId, executor, new TaskInfo(
taskId,
TaskInfo.MIN_VERSION,
TaskState.PLANNED,
location,
DateTime.now(),
new SharedBufferInfo(QueueState.OPEN, 0, 0, bufferStates),
ImmutableSet.<PlanNodeId>of(),
taskStats,
ImmutableList.<FailureInfo>of(),
ImmutableMap.<PlanNodeId, Set<?>>of()));
}
}
HttpRemoteTask.start()
の本体。HttpRemoteTask.scheduleUpdate()
を呼び出している。
scheduleUpdate()
は様々なところから呼び出されている。
// presto-server/src/main/java/com/facebook/presto/server/HttpRemoteTask.java:L229
public void start()
{
try (SetThreadName setThreadName = new SetThreadName("HttpRemoteTask-%s", taskId)) {
// to start we just need to trigger an update
scheduleUpdate();
}
}
HttpRemoteTask.scheduleUpdate()
の本体。
リクエストオブジェクトを作成し、 /v1/task
に投げている。
// presto-server/src/main/java/com/facebook/presto/server/HttpRemoteTask.java:L350
private synchronized void scheduleUpdate()
{
...
TaskUpdateRequest updateRequest = new TaskUpdateRequest(session,
planFragment,
sources,
outputBuffers.get());
Request request = preparePost()
.setUri(uriBuilderFrom(taskInfo.get().getSelf()).build())
.setHeader(HttpHeaders.CONTENT_TYPE, MediaType.JSON_UTF_8.toString())
.setBodyGenerator(jsonBodyGenerator(taskUpdateRequestCodec, updateRequest))
.build();
...
}
おそらくこの辺りでタスクが起動するはず。