Last active
December 10, 2019 22:43
Revisions
-
oza revised this gist
Feb 10, 2014 . 1 changed file with 8 additions and 7 deletions.There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -1,16 +1,17 @@ # Presto source code reading #1 * This document is created for Presto Source Code Reading #1. * http://atnd.org/events/47149 * Target: trunk code, Feb 10th, 2014 * Main Topic: Coordinator and Executor ./presto-main/src/main/java/com/facebook/presto/executions * http://www.slideshare.net/frsyuki/hadoop-source-code-reading-15-in-japan-presto/10 * NOTE 1. All source code noted here is quoted from https://github.com/facebook/presto. License bases on https://github.com/facebook/presto/blob/master/LICENSE. 2. This document can include something wrong. Pull requests to fix them are always welcome :-) * Prerequisite * Google Guice(Dependency Injection utility) * Google Guava(ListenableFuture, Argument check utility, etc.) * Java 7 ## Overview of Coordinator's code path -
oza revised this gist
Feb 10, 2014 . 1 changed file with 5 additions and 0 deletions.There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -8,6 +8,11 @@ * http://www.slideshare.net/frsyuki/hadoop-source-code-reading-15-in-japan-presto/10 * NOTE: All source code noted here is quoted from https://github.com/facebook/presto. License bases on https://github.com/facebook/presto/blob/master/LICENSE. * Prerequisite * Google Guice * Google Guava * Java 7 ## Overview of Coordinator's code path * HTTP Post request to "/v1/query" -
oza created this gist
Feb 10, 2014 .There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,479 @@ # Presto source code reading - Coordinator and Executor - * This document is created for Presto Source Code Reading. * http://atnd.org/events/47149 * Target: trunk code, Feb 10th, 2014 * Main Topic: ./presto-main/src/main/java/com/facebook/presto/executions * http://www.slideshare.net/frsyuki/hadoop-source-code-reading-15-in-japan-presto/10 * NOTE: All source code noted here is quoted from https://github.com/facebook/presto. License bases on https://github.com/facebook/presto/blob/master/LICENSE. ## Overview of Coordinator's code path * HTTP Post request to "/v1/query" * SqlQueryManager#createQuery() * SqlQueryExecution#start() ```java @Path("/v1/query") public class QueryResource { private final QueryManager queryManager; ... @POST @Produces(MediaType.APPLICATION_JSON) public Response createQuery(String query, @HeaderParam(PRESTO_USER) String user, @HeaderParam(PRESTO_SOURCE) String source, @HeaderParam(PRESTO_CATALOG) @DefaultValue(DEFAULT_CATALOG) String catalog, @HeaderParam(PRESTO_SCHEMA) @DefaultValue(DEFAULT_SCHEMA) String schema, @HeaderParam(USER_AGENT) String userAgent, @Context HttpServletRequest requestContext, @Context UriInfo uriInfo) { checkNotNull(query, "query is null"); checkArgument(!query.isEmpty(), "query is empty"); checkNotNull(catalog, "catalog is null"); checkNotNull(schema, "schema is null"); String remoteUserAddress = requestContext.getRemoteAddr(); QueryInfo queryInfo = queryManager.createQuery(new Session(user, source, catalog, schema, remoteUserAddress, userAgent), query); // Dive into this code! URI pagesUri = uriBuilderFrom(uriInfo.getRequestUri()).appendPath(queryInfo.getQueryId().toString()).build(); return Response.created(pagesUri).entity(queryInfo).build(); } ... } ``` ```java @ThreadSafe public class SqlQueryManager implements QueryManager { ... @Override public QueryInfo createQuery(Session session, String query) { checkNotNull(query, "query is null"); Preconditions.checkArgument(!query.isEmpty(), "query must not be empty string"); QueryId queryId = queryIdGenerator.createNextQueryId(); Statement statement; // assume to be returned Query class statement = SqlParser.createStatement(query); ... // executionFactories is initialized by Guice(CoordinatorModule#setup()). QueryExecutionFactory<?> queryExecutionFactory = executionFactories.get(statement.getClass()); final QueryExecution queryExecution = queryExecutionFactory.createQueryExecution(queryId, query, session, statement); // start the query in the background queryExecutor.submit(new QueryStarter(queryExecution, stats)); // -> call queryExecution#start() ... return queryExecution.getQueryInfo(); } ... } ``` ```java @ThreadSafe public class SqlQueryExecution implements QueryExecution { ... @Override public void start() { try (SetThreadName setThreadName = new SetThreadName("Query-%s", stateMachine.getQueryId())) { try { // transition to planning if (!stateMachine.beginPlanning()) { // query already started or finished return; } // analyze query SubPlan subplan = analyzeQuery(); // plan distribution of query: initialize outputStage planDistribution(subplan); // transition to starting if (!stateMachine.starting()) { // query already started or finished return; } // if query is not finished, start the stage, otherwise cancel it SqlStageExecution stage = outputStage.get(); if (!stateMachine.isDone()) { stage.start(); } else { stage.cancel(true); } } catch (Throwable e) { fail(e); Throwables.propagateIfInstanceOf(e, Error.class); } } } .... } ``` ## SQL Transformation overview SQL(String) -- [SQLParser] --> Query -- [SqlQueryExecution#analayzeQuery] --> SubPlan -- [SqlQueryExecution#planDistribution()] --> SqlStageExecution * SubPlan: Logical Plan * Tree structure * fragment: body of the node * children: child node of the node * SqlStageExecution: 1. Select Node to execute RemoteTask by NodeScheduler 2. Wrapper of ExecutorService, which run RemoteTasks 3. ExecutorService launches HttpRemoteTask via createRemoteTask() ## Executor Overview * HttpRemoteTask#start() - HTTP Request -> Worker * TaskResource#createOrUpdateTask() -> SqlTaskManager#updateTask() * SqlTaskExecution.createSqlTaskExecution() 1. SqlTaskExecution: 2. LocalExecutionPlanner#plan(): Transforming Logical Plan into Physical Plan 3. Driver#process(): ```java /** * Manages tasks on this worker node */ @Path("/v1/task") public class TaskResource { private static final DataSize DEFAULT_MAX_SIZE = new DataSize(10, Unit.MEGABYTE); private static final Duration DEFAULT_MAX_WAIT_TIME = new Duration(1, SECONDS); private final TaskManager taskManager; ... @POST @Path("{taskId}") @Consumes(MediaType.APPLICATION_JSON) @Produces(MediaType.APPLICATION_JSON) public Response createOrUpdateTask(@PathParam("taskId") TaskId taskId, TaskUpdateRequest taskUpdateRequest, @Context UriInfo uriInfo) { checkNotNull(taskUpdateRequest, "taskUpdateRequest is null"); TaskInfo taskInfo = taskManager.updateTask(taskUpdateRequest.getSession(), taskId, taskUpdateRequest.getFragment(), taskUpdateRequest.getSources(), taskUpdateRequest.getOutputIds()); return Response.ok().entity(taskInfo).build(); } ... } ``` ```java public class SqlTaskManager implements TaskManager { private final TaskExecutor taskExecutor; ... @Override public TaskInfo updateTask(Session session, TaskId taskId, PlanFragment fragment, List<TaskSource> sources, OutputBuffers outputBuffers) { URI location = locationFactory.createLocalTaskLocation(taskId); TaskExecution taskExecution; synchronized (this) { taskExecution = tasks.get(taskId); if (taskExecution == null) { // is task already complete? TaskInfo taskInfo = taskInfos.get(taskId); if (taskInfo != null) { return taskInfo; } taskExecution = SqlTaskExecution.createSqlTaskExecution(session, taskId, location, fragment, sources, outputBuffers, planner, maxBufferSize, taskExecutor, taskNotificationExecutor, maxTaskMemoryUsage, operatorPreAllocatedMemory, queryMonitor, cpuTimerEnabled ); tasks.put(taskId, taskExecution); } } taskExecution.recordHeartbeat(); taskExecution.addSources(sources); taskExecution.addResultQueue(outputBuffers); return getTaskInfo(taskExecution, false); } ... } ``` ```java public class SqlTaskExecution implements TaskExecution { ... private SqlTaskExecution(Session session, TaskId taskId, URI location, PlanFragment fragment, OutputBuffers outputBuffers, LocalExecutionPlanner planner, DataSize maxBufferSize, TaskExecutor taskExecutor, DataSize maxTaskMemoryUsage, DataSize operatorPreAllocatedMemory, QueryMonitor queryMonitor, Executor notificationExecutor, boolean cpuTimerEnabled) { ... LocalExecutionPlan localExecutionPlan = planner.plan(session, fragment.getRoot(), fragment.getSymbols(), new TaskOutputFactory(sharedBuffer)); List<DriverFactory> driverFactories = localExecutionPlan.getDriverFactories(); } .... private synchronized void enqueueDrivers(boolean forceRunSplit, List<DriverSplitRunner> runners) { // schedule driver to be executed List<ListenableFuture<?>> finishedFutures = taskExecutor.enqueueSplits(taskHandle, forceRunSplit, runners); checkState(finishedFutures.size() == runners.size(), "Expected %s futures but got %s", runners.size(), finishedFutures.size()); // record new driver remainingDrivers.addAndGet(finishedFutures.size()); // when driver completes, update state and fire events for (int i = 0; i < finishedFutures.size(); i++) { ListenableFuture<?> finishedFuture = finishedFutures.get(i); final DriverSplitRunner splitRunner = runners.get(i); Futures.addCallback(finishedFuture, new FutureCallback<Object>() { @Override public void onSuccess(Object result) { try (SetThreadName setThreadName = new SetThreadName("Task-%s", taskId)) { // record driver is finished remainingDrivers.decrementAndGet(); checkTaskCompletion(); queryMonitor.splitCompletionEvent(taskId, splitRunner.getDriverContext().getDriverStats()); } } @Override public void onFailure(Throwable cause) { try (SetThreadName setThreadName = new SetThreadName("Task-%s", taskId)) { taskStateMachine.failed(cause); // record driver is finished remainingDrivers.decrementAndGet(); DriverContext driverContext = splitRunner.getDriverContext(); DriverStats driverStats; if (driverContext != null) { driverStats = driverContext.getDriverStats(); } else { // split runner did not start successfully driverStats = new DriverStats(); } // fire failed event with cause queryMonitor.splitFailedEvent(taskId, driverStats, cause); } } }, notificationExecutor); } } ... // // This code starts registers a callback with access to this class, and this // call back is access from another thread, so this code can not be placed in the constructor private void start() { // start unpartitioned drivers List<DriverSplitRunner> runners = new ArrayList<>(); for (DriverSplitRunnerFactory driverFactory : unpartitionedDriverFactories) { runners.add(driverFactory.createDriverRunner(null)); driverFactory.setNoMoreSplits(); } enqueueDrivers(true, runners); } ... } ``` ```java @ThreadSafe public class TaskExecutor { ... private class Runner implements Runnable { private final long runnerId = NEXT_RUNNER_ID.getAndIncrement(); @Override public void run() { try (SetThreadName runnerName = new SetThreadName("SplitRunner-%s", runnerId)) { while (!closed && !Thread.currentThread().isInterrupted()) { // select next worker final PrioritizedSplitRunner split; try { split = pendingSplits.take(); if (split.updatePriorityLevel()) { // priority level changed, return split to queue for re-prioritization pendingSplits.put(split); continue; } } catch (InterruptedException e) { Thread.currentThread().interrupt(); return; } try (SetThreadName splitName = new SetThreadName(split.getTaskHandle().getTaskId() + "-" + split.getSplitId())) { runningSplits.add(split); boolean finished; ListenableFuture<?> blocked; try { blocked = split.process(); finished = split.isFinished(); } finally { runningSplits.remove(split); } if (finished) { log.debug("%s is finished", split.getInfo()); splitFinished(split); } else { if (blocked.isDone()) { pendingSplits.put(split); } else { blockedSplits.add(split); blocked.addListener(new Runnable() { @Override public void run() { blockedSplits.remove(split); split.updatePriorityLevel(); pendingSplits.put(split); } }, executor); } } } catch (Throwable t) { log.error(t, "Error processing %s", split.getInfo()); splitFinished(split); } } } finally { // unless we have been closed, we need to replace this thread if (!closed) { addRunnerThread(); } } } } ... } ``` ### MultiTenant feature * multitenancy is assured by priority queue ```java private static class PrioritizedSplitRunner implements Comparable<PrioritizedSplitRunner> { ... public ListenableFuture<?> process() throws Exception { try { start.compareAndSet(0, System.currentTimeMillis()); processCalls.incrementAndGet(); CpuTimer timer = new CpuTimer(); ListenableFuture<?> blocked = split.processFor(SPLIT_RUN_QUANTA); CpuTimer.CpuDuration elapsed = timer.elapsedTime(); // update priority level base on total thread usage of task long durationNanos = elapsed.getWall().roundTo(TimeUnit.NANOSECONDS); long threadUsageNanos = taskHandle.addThreadUsageNanos(durationNanos); this.threadUsageNanos.set(threadUsageNanos); priorityLevel.set(calculatePriorityLevel(threadUsageNanos)); // record last run for prioritization within a level lastRun.set(ticker.read()); cpuTime.addAndGet(elapsed.getCpu().roundTo(TimeUnit.NANOSECONDS)); return blocked; } catch (Throwable e) { finishedFuture.setException(e); throw e; } } ... } ```