Skip to content

Instantly share code, notes, and snippets.

@komamitsu
Last active February 21, 2024 13:36
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save komamitsu/34ccb9d99fb20a44197d to your computer and use it in GitHub Desktop.
Save komamitsu/34ccb9d99fb20a44197d to your computer and use it in GitHub Desktop.
Presto code reading memo

server.QueryResource#createQuery

  • server.QueryResource#createQuery

    • server.ResourceUtil#createSessionForRequest
    • execution.SqlQueryManager#createQuery
      • sql.parser.SqlParser#createStatement
      • event.query.QueryMonitor#createdEvent
      • execution.SqlQueryExecution#addStateChangeListener
      • execution.SqlQueryManager.QueryStarter#submit
      • execution.SqlQueryManager.QueryStarter.QueryQueue#enqueue
      • execution.SqlQueryManager#addCompletionCallback
      • io.airlift.concurrent.AsyncSemaphore#submit
  • io.airlift.concurrent.AsyncSemaphore#runNext

    • execution.SqlQueryManager.QueryStarter.QuerySubmitter
      • execution.SqlQueryManagerStats#queryStarted
      • execution.SqlQueryExecution#start
        • execution.SqlQueryExecution#analyzeQuery
          • sql.analyzer.Analyzer#analyze
          • sql.planner.LogicalPlanner#plan
            • sql.planner.LogicalPlanner#createRelationPlan
              • sql.planner.RelationPlanner#process
              • sql.tree.Query#accept
              • sql.planner.RelationPlanner#visitQuery
            • sql.planner.LogicalPlanner#createOutputPlan
            • (foreach planOptimizers) sql.planner.optimizations.PlanOptimizer#optimize
          • sql.planner.InputExtractor#extract
          • sql.planner.PlanFragmenter#createSubPlans
            • sql.planner.plan.PlanRewriter#rewriteWith
            • sql.planner.PlanFragmenter.Fragmenter#buildFragment
        • execution.SqlQueryExecution#planDistribution
          • sql.planner.DistributedExecutionPlanner#plan
          • execution.SqlStageExecutionstart
  • execution.SqlStageExecution#start

    • (foreach substages) execution.SqlStageExecution#scheduleStartTasks
    • (in other thread) execution.SqlStageExecution#startTasks
      • switch(sql.planner.PlanFragment#getDistribution)
        • PlanDistribution.SINGLE => scheduleFixedNodeCount(1)
        • PlanDistribution.FIXED => scheduleFixedNodeCount(initialHashPartitions)
        • PlanDistribution.SOURCE => scheduleSourcePartitionedNodes
        • PlanDistribution.COORDINATOR_ONLY => scheduleOnCurrentNode
          • execution.NodeScheduler.NodeSelector#selectRandomNodes/selectCurrentNode
          • (foreach nodes) execution.SqlStageExecution#scheduleTask
            • execution.SqlStageExecution#addNewExchangesAndBuffers
            • (foreach sourceSplits) add them to initialSplits
            • (foreach exchangeLocations) add them to initialSplits
            • execution.RemoteTaskFactory#createRemoteTask with initialSplits
            • execution.RemoteTask#start
              • server.HttpRemoteTask#scheduleUpdate
                • if we have an old request outstanding, cancel it
                • server.HttpRemoteTask#getSources
                • io.airlift.http.client.HttpClient#executeAsync with TaskUpdateRequest
          • (foreach substages) execution.StageExecutionNode#parentTasksAdded
            • check the fragment's outputPartitioning
              • OutputPartitioning.NONE
                • set nextOutputBuffers to UnpartitionedPagePartitionFunction if the buffer is changed
              • OutputPartitioning.HASH
                • set nextOutputBuffers to HashPagePartitionFunction if the buffer is changed
                • the HashPagePartitionFunction is correspond to each parentTask
      • execution.StateMachine#set(SCHEDULED)
      • execution.SqlStageExecution#updateNewExchangesAndBuffers
        • execution.SqlStageExecution#addNewExchangesAndBuffers
          • execution.SqlStageExecution#updateCompleteSources
            • (foreach fragment's sources)
              • if the PlanNode isn't finished and is RemoteSourceNode
                • (foreach its sourceFragments)
                  • if the fragment is other than PLANNED or SCHEDULING, mark the PlanNode as completed
          • execution.SqlStageExecution#getNewExchangeLocations
            • (foreach fragment's sources)
              • if it's RemoteSourceNode
                • (foreach its sourceFragments)
                  • (foreach the substage's taskLocations)
                    • add the taskLocation to newExchangeLocations
          • (foreach tasks:RemoteTask)
            • (foreach newExchangeLocations) createRemoteSplitFor & execution.RemoteTask#addSplits
            • (foreach completeSources) mark NoMoreSplits
        • execution.SqlStageExecution#updateNewExchangesAndBuffers
          • execution.SqlStageExecution#updateCompleteSources
          • return if it finished and current buffure is no more buffer

server.TaskResource#createOrUpdateTask

  • server.TaskResource#createOrUpdateTask

    • execution.SqlTaskManager#updateTask with session, fragment, sources and outputIds
      • execution.SqlTask#updateTask
        • execution.SqlTaskExecutionFactory#create with fragment and sharedBuffer
        • create operator.TaskContext with state, executor, operatorPreAllocatedMemory
        • execution.SqlTaskExecution#createSqlTaskExecution with taskContext and taskExecutor
          • execution.SqlTaskExecution#start
          • (foreach unpartitionedDriverFactories)
            • (foreach the driverInstances)
              • execution.SqlTaskExecution.DriverSplitRunnerFactory#createDriverRunner
              • execution.SqlTaskExecution#enqueueDrivers
                • execution.TaskExecutor#enqueueSplits
                  • (foreach taskSplits)
                  • create PrioritizedSplitRunner with TaskHandle and SplitRunner
                  • execution.TaskExecutor.TaskHandle#enqueueSplit
                    • // add this to the work queue for the task
                    • queuedSplits.add(split);
                  • execution.TaskExecutor#scheduleTaskIfNecessary
                    • // if task is under the limit for gaurenteed splits, start one
                    • execution.TaskExecutor.TaskHandle#pollNextSplit
                    • pendingSplits.put(split) (This split is handled by execution.TaskExecutor.Runner)
                  • execution.TaskExecutor#addNewEntrants
                    • // if globally we have more resources, start more
                    • execution.TaskExecutor#pollNextSplitWorker
                    • pendingSplits.put(split) (This split is handled by execution.TaskExecutor.Runner)
  • execution.TaskExecutor.Runner#run

    • execution.TaskExecutor.PrioritizedSplitRunner#process
      • execution.SqlTaskExecution.DriverSplitRunner#processFor
        • execution.SqlTaskExecution.DriverSplitRunnerFactory#createDriver
          • operator.DriverFactory#createDriver
            • (foreach operatorFactories)
              • // TableScanOperator requires partitioned split to be added before the first call to process
              • create TaskSource(partitionedSourceId, partitionedSplit, true)
              • operator.Driver#updateSource(TaskSource:planNodeId, splits, noMoreSplits)
                • update operator.Driver#newSources w/o lock
                • tryLockAndProcessPendingStateChanges
                  • presto.operator.Driver#processNewSources
                    • (foreach sources) operator.Driver#processNewSource
                    • (for new ScheduledSplit(s))
                    • ourceOperators.get(source.getPlanNodeId()).addSplit(newSplit.getSplit)
        • (foreach unpartitionedSources)
          • operator.Driver#updateSource (see above)
      • com.facebook.presto.operator.Driver#processFor
        • (for assigned time slice?) com.facebook.presto.operator.Driver#processInternal
          • operator.Driver#processNewSources (see above)
          • (foreach alive operators)
            • return the blocked operator if either one is blocked
            • if the current operator is not finished and next operator needs input...
              • if we got an output page, add it to the next operator
            • if current operator is finished..., let next operator know there will be no more data
      • update priority level base on total thread usage of task
      • if the split finished
        • execution.TaskExecutor#splitFinished
          • let TaskExecutor re-schedule tasks
      • else if the blocked is done
        • put the split in pendingSplits
      • else put the blocked in blockedSplits

sql.planner.DistributedLogicalPlanner

sql.planner.PlanFragmenter

sql.planner.RelationPlanner

execution.NodeScheduler

presto.execution.SharedBuffer

See also

Debug with tpch(sf1)

select count(name) from part

  • analysis
    • outputDescriptors
"QuerySpecification{select=Select{distinct=false, selectItems=["count"("name")]}, from=Optional[Table{part}], where=null, groupBy=[], having=null, orderBy=[], limit=null}" -> "[<anonymous>:bigint]"
"Table{part}" -> "[part.partkey:bigint, part.name:varchar, part.mfgr:varchar, part.brand:varchar, part.type:varchar, part.size:bigint, part.container:varchar, part.retailprice:double, part.comment:varchar, part.row_number:bigint]"
"Query{queryBody=QuerySpecification{select=Select{distinct=false, selectItems=["count"("name")]}, from=Optional[Table{part}], where=null, groupBy=[], having=null, orderBy=[], limit=null}, orderBy=[]}" -> "[<anonymous>:bigint]"
  • tables
"Table{part}" -> "tpch:tpch:part:sf1.0"
  • plan
    • root

      • columnNames: _col0
      • outputs: count
      • id: 7
      • source
        • id: 2
        • source
          • id: 8
          • type: GATHER
          • inputs: count_4
          • outputs: count_4
          • sources
            • source
              • id: 0
              • table:
                • connectorId: tpch
                • connectorHandle: tpch:part:sf1.0
              • outputSymbol: name
              • assignments: "name" -> "tpch:tpch:name:1"
              • summarizedPartitions
                • generatedPartitions: sql.planner.plan.TableScanNode$GeneratedPartitions
                • partitionDomainSummary: TupleDomain:ALL
        • aggregation: "count" -> ""count"("count_4")"
        • functions: "count" -> "count(varchar):bigint"
        • step: FINAL
    • symbols

"container" -> "varchar"
"name_0" -> "varchar"
"field_3" -> "bigint"
"count" -> "bigint"
"type" -> "varchar"
"partkey" -> "bigint"
"count_4" -> "bigint"
"size" -> "bigint"
"field" -> "bigint"
"count_2" -> "bigint"
"count_1" -> "bigint"
"mfgr" -> "varchar"
"name" -> "varchar"
"retailprice" -> "double"
"comment" -> "varchar"
"row_number" -> "bigint"
"brand" -> "varchar"
  • subplan

    • fragment
      • PlanFragment{id=1, distribution=SINGLE, partitionedSource=null, outputPartitioning=NONE, hash=Optional.empty}
    • children
      • fragment
        • PlanFragment{id=0, distribution=SOURCE, partitionedSource=0, outputPartitioning=NONE, hash=Optional.empty}
  • outputStageExecutionPlan

    • PlanFragment{id=1, distribution=SINGLE, partitionedSource=null, outputPartitioning=NONE, hash=Optional.empty}
    • substages
      • StageExecutionPlan{fragment=PlanFragment{id=0, distribution=SOURCE, partitionedSource=0, outputPartitioning=NONE, hash=Optional.empty}, dataSource=Optional[tpch:com.facebook.presto.spi.FixedSplitSource@fb602ad], subStages=[]}
  • outputStage

    • SqlStageExecution{stageId=20150330_161922_00005_e5dfi.0, location=http://192.168.0.2:8080/v1/stage/20150330_161922_00005_e5dfi.0, stageState=PLANNED}
    • fragment
      • PlanFragment{id=1, distribution=SINGLE, partitionedSource=null, outputPartitioning=NONE, hash=Optional.empty}
    • substages
      • "0" -> "SqlStageExecution{stageId=20150330_161922_00005_e5dfi.1, location=http://192.168.0.2:8080/v1/stage/20150330_161922_00005_e5dfi.1, stageState=PLANNED}"
      • fragment
        • PlanFragment{id=0, distribution=SOURCE, partitionedSource=0, outputPartitioning=NONE, hash=Optional.empty}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment