Skip to content

Instantly share code, notes, and snippets.

@oza
Last active December 10, 2019 22:43

Revisions

  1. oza revised this gist Feb 10, 2014. 1 changed file with 8 additions and 7 deletions.
    15 changes: 8 additions & 7 deletions presto_executor_and_coordinator.md
    Original file line number Diff line number Diff line change
    @@ -1,16 +1,17 @@
    # Presto source code reading - Coordinator and Executor -
    # Presto source code reading #1


    * This document is created for Presto Source Code Reading.
    * This document is created for Presto Source Code Reading #1.
    * http://atnd.org/events/47149
    * Target: trunk code, Feb 10th, 2014
    * Main Topic: ./presto-main/src/main/java/com/facebook/presto/executions
    * Main Topic: Coordinator and Executor ./presto-main/src/main/java/com/facebook/presto/executions
    * http://www.slideshare.net/frsyuki/hadoop-source-code-reading-15-in-japan-presto/10
    * NOTE: All source code noted here is quoted from https://github.com/facebook/presto. License bases on https://github.com/facebook/presto/blob/master/LICENSE.
    * NOTE
    1. All source code noted here is quoted from https://github.com/facebook/presto. License bases on https://github.com/facebook/presto/blob/master/LICENSE.
    2. This document can include something wrong. Pull requests to fix them are always welcome :-)

    * Prerequisite
    * Google Guice
    * Google Guava
    * Google Guice(Dependency Injection utility)
    * Google Guava(ListenableFuture, Argument check utility, etc.)
    * Java 7

    ## Overview of Coordinator's code path
  2. oza revised this gist Feb 10, 2014. 1 changed file with 5 additions and 0 deletions.
    5 changes: 5 additions & 0 deletions presto_executor_and_coordinator.md
    Original file line number Diff line number Diff line change
    @@ -8,6 +8,11 @@
    * http://www.slideshare.net/frsyuki/hadoop-source-code-reading-15-in-japan-presto/10
    * NOTE: All source code noted here is quoted from https://github.com/facebook/presto. License bases on https://github.com/facebook/presto/blob/master/LICENSE.

    * Prerequisite
    * Google Guice
    * Google Guava
    * Java 7

    ## Overview of Coordinator's code path

    * HTTP Post request to "/v1/query"
  3. oza created this gist Feb 10, 2014.
    479 changes: 479 additions & 0 deletions presto_executor_and_coordinator.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,479 @@
    # Presto source code reading - Coordinator and Executor -


    * This document is created for Presto Source Code Reading.
    * http://atnd.org/events/47149
    * Target: trunk code, Feb 10th, 2014
    * Main Topic: ./presto-main/src/main/java/com/facebook/presto/executions
    * http://www.slideshare.net/frsyuki/hadoop-source-code-reading-15-in-japan-presto/10
    * NOTE: All source code noted here is quoted from https://github.com/facebook/presto. License bases on https://github.com/facebook/presto/blob/master/LICENSE.

    ## Overview of Coordinator's code path

    * HTTP Post request to "/v1/query"
    * SqlQueryManager#createQuery()
    * SqlQueryExecution#start()


    ```java
    @Path("/v1/query")
    public class QueryResource {
    private final QueryManager queryManager;
    ...
    @POST
    @Produces(MediaType.APPLICATION_JSON)
    public Response createQuery(String query,
    @HeaderParam(PRESTO_USER) String user,
    @HeaderParam(PRESTO_SOURCE) String source,
    @HeaderParam(PRESTO_CATALOG) @DefaultValue(DEFAULT_CATALOG) String catalog,
    @HeaderParam(PRESTO_SCHEMA) @DefaultValue(DEFAULT_SCHEMA) String schema,
    @HeaderParam(USER_AGENT) String userAgent,
    @Context HttpServletRequest requestContext,
    @Context UriInfo uriInfo)
    {
    checkNotNull(query, "query is null");
    checkArgument(!query.isEmpty(), "query is empty");
    checkNotNull(catalog, "catalog is null");
    checkNotNull(schema, "schema is null");

    String remoteUserAddress = requestContext.getRemoteAddr();
    QueryInfo queryInfo = queryManager.createQuery(new Session(user, source, catalog, schema, remoteUserAddress, userAgent), query); // Dive into this code!
    URI pagesUri = uriBuilderFrom(uriInfo.getRequestUri()).appendPath(queryInfo.getQueryId().toString()).build();
    return Response.created(pagesUri).entity(queryInfo).build();
    }
    ...
    }
    ```


    ```java
    @ThreadSafe
    public class SqlQueryManager
    implements QueryManager
    {
    ...
    @Override
    public QueryInfo createQuery(Session session, String query)
    {
    checkNotNull(query, "query is null");
    Preconditions.checkArgument(!query.isEmpty(), "query must not be empty string");

    QueryId queryId = queryIdGenerator.createNextQueryId();

    Statement statement;
    // assume to be returned Query class
    statement = SqlParser.createStatement(query);

    ...
    // executionFactories is initialized by Guice(CoordinatorModule#setup()).
    QueryExecutionFactory<?> queryExecutionFactory = executionFactories.get(statement.getClass());
    final QueryExecution queryExecution = queryExecutionFactory.createQueryExecution(queryId, query, session, statement);
    // start the query in the background
    queryExecutor.submit(new QueryStarter(queryExecution, stats));
    // -> call queryExecution#start()
    ...
    return queryExecution.getQueryInfo();
    }
    ...
    }
    ```

    ```java
    @ThreadSafe
    public class SqlQueryExecution
    implements QueryExecution
    {
    ...
    @Override
    public void start()
    {
    try (SetThreadName setThreadName = new SetThreadName("Query-%s", stateMachine.getQueryId())) {
    try {
    // transition to planning
    if (!stateMachine.beginPlanning()) {
    // query already started or finished
    return;
    }

    // analyze query
    SubPlan subplan = analyzeQuery();

    // plan distribution of query: initialize outputStage
    planDistribution(subplan);

    // transition to starting
    if (!stateMachine.starting()) {
    // query already started or finished
    return;
    }

    // if query is not finished, start the stage, otherwise cancel it
    SqlStageExecution stage = outputStage.get();

    if (!stateMachine.isDone()) {
    stage.start();
    }
    else {
    stage.cancel(true);
    }
    }
    catch (Throwable e) {
    fail(e);
    Throwables.propagateIfInstanceOf(e, Error.class);
    }
    }
    }
    ....
    }
    ```

    ## SQL Transformation overview

    SQL(String)
    -- [SQLParser] --> Query
    -- [SqlQueryExecution#analayzeQuery] --> SubPlan
    -- [SqlQueryExecution#planDistribution()] --> SqlStageExecution


    * SubPlan: Logical Plan
    * Tree structure
    * fragment: body of the node
    * children: child node of the node

    * SqlStageExecution:
    1. Select Node to execute RemoteTask by NodeScheduler
    2. Wrapper of ExecutorService, which run RemoteTasks
    3. ExecutorService launches HttpRemoteTask via createRemoteTask()

    ## Executor Overview


    * HttpRemoteTask#start() - HTTP Request -> Worker
    * TaskResource#createOrUpdateTask() -> SqlTaskManager#updateTask()
    * SqlTaskExecution.createSqlTaskExecution()


    1. SqlTaskExecution:
    2. LocalExecutionPlanner#plan(): Transforming Logical Plan into Physical Plan
    3. Driver#process():

    ```java

    /**
    * Manages tasks on this worker node
    */
    @Path("/v1/task")
    public class TaskResource
    {
    private static final DataSize DEFAULT_MAX_SIZE = new DataSize(10, Unit.MEGABYTE);
    private static final Duration DEFAULT_MAX_WAIT_TIME = new Duration(1, SECONDS);

    private final TaskManager taskManager;
    ...

    @POST
    @Path("{taskId}")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    public Response createOrUpdateTask(@PathParam("taskId") TaskId taskId, TaskUpdateRequest taskUpdateRequest, @Context UriInfo uriInfo)
    {
    checkNotNull(taskUpdateRequest, "taskUpdateRequest is null");

    TaskInfo taskInfo = taskManager.updateTask(taskUpdateRequest.getSession(),
    taskId,
    taskUpdateRequest.getFragment(),
    taskUpdateRequest.getSources(),
    taskUpdateRequest.getOutputIds());

    return Response.ok().entity(taskInfo).build();
    }
    ...
    }
    ```

    ```java
    public class SqlTaskManager
    implements TaskManager
    {
    private final TaskExecutor taskExecutor;
    ...
    @Override
    public TaskInfo updateTask(Session session, TaskId taskId, PlanFragment fragment, List<TaskSource> sources, OutputBuffers outputBuffers)
    {
    URI location = locationFactory.createLocalTaskLocation(taskId);

    TaskExecution taskExecution;
    synchronized (this) {
    taskExecution = tasks.get(taskId);
    if (taskExecution == null) {
    // is task already complete?
    TaskInfo taskInfo = taskInfos.get(taskId);
    if (taskInfo != null) {
    return taskInfo;
    }

    taskExecution = SqlTaskExecution.createSqlTaskExecution(session,
    taskId,
    location,
    fragment,
    sources,
    outputBuffers,
    planner,
    maxBufferSize,
    taskExecutor,
    taskNotificationExecutor,
    maxTaskMemoryUsage,
    operatorPreAllocatedMemory,
    queryMonitor,
    cpuTimerEnabled
    );
    tasks.put(taskId, taskExecution);
    }
    }

    taskExecution.recordHeartbeat();
    taskExecution.addSources(sources);
    taskExecution.addResultQueue(outputBuffers);

    return getTaskInfo(taskExecution, false);
    }
    ...
    }
    ```


    ```java
    public class SqlTaskExecution
    implements TaskExecution
    {
    ...
    private SqlTaskExecution(Session session,
    TaskId taskId,
    URI location,
    PlanFragment fragment,
    OutputBuffers outputBuffers,
    LocalExecutionPlanner planner,
    DataSize maxBufferSize,
    TaskExecutor taskExecutor,
    DataSize maxTaskMemoryUsage,
    DataSize operatorPreAllocatedMemory,
    QueryMonitor queryMonitor,
    Executor notificationExecutor,
    boolean cpuTimerEnabled)
    {
    ...

    LocalExecutionPlan localExecutionPlan = planner.plan(session, fragment.getRoot(), fragment.getSymbols(), new TaskOutputFactory(sharedBuffer));
    List<DriverFactory> driverFactories = localExecutionPlan.getDriverFactories();
    }
    ....


    private synchronized void enqueueDrivers(boolean forceRunSplit, List<DriverSplitRunner> runners)
    {
    // schedule driver to be executed
    List<ListenableFuture<?>> finishedFutures = taskExecutor.enqueueSplits(taskHandle, forceRunSplit, runners);
    checkState(finishedFutures.size() == runners.size(), "Expected %s futures but got %s", runners.size(), finishedFutures.size());

    // record new driver
    remainingDrivers.addAndGet(finishedFutures.size());

    // when driver completes, update state and fire events
    for (int i = 0; i < finishedFutures.size(); i++) {
    ListenableFuture<?> finishedFuture = finishedFutures.get(i);
    final DriverSplitRunner splitRunner = runners.get(i);
    Futures.addCallback(finishedFuture, new FutureCallback<Object>()
    {
    @Override
    public void onSuccess(Object result)
    {
    try (SetThreadName setThreadName = new SetThreadName("Task-%s", taskId)) {
    // record driver is finished
    remainingDrivers.decrementAndGet();

    checkTaskCompletion();

    queryMonitor.splitCompletionEvent(taskId, splitRunner.getDriverContext().getDriverStats());
    }
    }

    @Override
    public void onFailure(Throwable cause)
    {
    try (SetThreadName setThreadName = new SetThreadName("Task-%s", taskId)) {
    taskStateMachine.failed(cause);

    // record driver is finished
    remainingDrivers.decrementAndGet();

    DriverContext driverContext = splitRunner.getDriverContext();
    DriverStats driverStats;
    if (driverContext != null) {
    driverStats = driverContext.getDriverStats();
    }
    else {
    // split runner did not start successfully
    driverStats = new DriverStats();
    }

    // fire failed event with cause
    queryMonitor.splitFailedEvent(taskId, driverStats, cause);
    }
    }
    }, notificationExecutor);
    }
    }
    ...

    //
    // This code starts registers a callback with access to this class, and this
    // call back is access from another thread, so this code can not be placed in the constructor
    private void start()
    {
    // start unpartitioned drivers
    List<DriverSplitRunner> runners = new ArrayList<>();
    for (DriverSplitRunnerFactory driverFactory : unpartitionedDriverFactories) {
    runners.add(driverFactory.createDriverRunner(null));
    driverFactory.setNoMoreSplits();
    }
    enqueueDrivers(true, runners);
    }
    ...
    }

    ```




    ```java
    @ThreadSafe
    public class TaskExecutor
    {
    ...

    private class Runner
    implements Runnable
    {
    private final long runnerId = NEXT_RUNNER_ID.getAndIncrement();

    @Override
    public void run()
    {
    try (SetThreadName runnerName = new SetThreadName("SplitRunner-%s", runnerId)) {
    while (!closed && !Thread.currentThread().isInterrupted()) {
    // select next worker
    final PrioritizedSplitRunner split;
    try {
    split = pendingSplits.take();
    if (split.updatePriorityLevel()) {
    // priority level changed, return split to queue for re-prioritization
    pendingSplits.put(split);
    continue;
    }
    }
    catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    return;
    }

    try (SetThreadName splitName = new SetThreadName(split.getTaskHandle().getTaskId() + "-" + split.getSplitId())) {
    runningSplits.add(split);

    boolean finished;
    ListenableFuture<?> blocked;
    try {
    blocked = split.process();
    finished = split.isFinished();
    }
    finally {
    runningSplits.remove(split);
    }

    if (finished) {
    log.debug("%s is finished", split.getInfo());
    splitFinished(split);
    }
    else {
    if (blocked.isDone()) {
    pendingSplits.put(split);
    }
    else {
    blockedSplits.add(split);
    blocked.addListener(new Runnable()
    {
    @Override
    public void run()
    {
    blockedSplits.remove(split);
    split.updatePriorityLevel();
    pendingSplits.put(split);
    }
    }, executor);
    }
    }
    }
    catch (Throwable t) {
    log.error(t, "Error processing %s", split.getInfo());
    splitFinished(split);
    }
    }
    }
    finally {
    // unless we have been closed, we need to replace this thread
    if (!closed) {
    addRunnerThread();
    }
    }
    }
    }
    ...
    }
    ```

    ### MultiTenant feature

    * multitenancy is assured by priority queue

    ```java

    private static class PrioritizedSplitRunner
    implements Comparable<PrioritizedSplitRunner>
    {
    ...

    public ListenableFuture<?> process()
    throws Exception
    {
    try {
    start.compareAndSet(0, System.currentTimeMillis());

    processCalls.incrementAndGet();
    CpuTimer timer = new CpuTimer();
    ListenableFuture<?> blocked = split.processFor(SPLIT_RUN_QUANTA);

    CpuTimer.CpuDuration elapsed = timer.elapsedTime();

    // update priority level base on total thread usage of task
    long durationNanos = elapsed.getWall().roundTo(TimeUnit.NANOSECONDS);
    long threadUsageNanos = taskHandle.addThreadUsageNanos(durationNanos);
    this.threadUsageNanos.set(threadUsageNanos);
    priorityLevel.set(calculatePriorityLevel(threadUsageNanos));

    // record last run for prioritization within a level
    lastRun.set(ticker.read());

    cpuTime.addAndGet(elapsed.getCpu().roundTo(TimeUnit.NANOSECONDS));
    return blocked;
    }
    catch (Throwable e) {
    finishedFuture.setException(e);
    throw e;
    }
    }
    ...
    }
    ```