Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Inside Impala Coordinator


Impala flow

Impala Execution Flow

  1. User queries HiveQL to Impalad using Thrift API
  2. Frontend Planner generates Query Plan Tree using meta information
  3. Backend Coordinator sends an execution request to each query execution engine
  4. Backend Query execution engine executes a query fragment, e.g. HDFS Scan, Aggregation, Merge and etc

a. State Store notifies Impalad when cluster state changed

This presentaion focuses 3 and a steps.

Coordinator execution flow

  • ImpalaServer#query (Async API)

  • ImpalaServer#Execute with QueryExecState

  • ImpalaServer#ExecuteInternal

    • Call ImpalaServer#GetExecRequest for creating TExecRequest via FE
    • Register exec_state with query id
    • After call QueryExecState#Exec, stores query location for runninig hosts (map<Host, QueryId>)
  • QueryExecState#Exec

  • Coordinator#Exec

    • Call Coordinator#ComputeFragmentExecParams
      • Use Coordinator#ComputeFragmentHosts for computing host, which executes a fragment. This function considers data localtion(SimpleScheduler#GetHosts), input fragment locatinon(exchange node) and coodinator location(single node).
    • Call Coordinator#ComputeScanRangeAssignment
      • Generates vector<map<host, map<node_id, vector<scan_range_params>>>>. vector's index is fragment id.
    • Call PlanFragmentExecutor#Prepare(ask oza_x86 for more details).
    • Setup profiles and create BackendExecState for each fragment.
    • Execute Coordinator#ExecRemoteFragment using ParallelExecutor::Exec
      • ExecRemoteFragment call ImpaladServer#ExecPlanFragment using BE client
        • See ImpalaServer#ExecPlanFragment
      • Invoke a thread for each fragment
      • If a fragment execution failed, Coordinator caccels other fragments. No recovery.
  • SimpleScheduler#GetHosts

    • all running impalad listed in map<host, list<port>>. This function checks ScanNode location using this map for data locality.
    • State Store for sharing running Impalad via SubscriptionManager
  • ParallelExecutor#Exec

    • Execute multiple functions with thread in parallel


$ be/src/service/impala-servser.{h,cc}
  • Provide Async API
  • Construct a TExecRequest via FE
    • TExecRequest has Query Plan Tree
  • Store query id and its state
    • Fetch, Cancel and other APIs use those states


  • One instance per request
    • Capture all states for request execution
  • Convert Coordinator’s result(RowBatch) to client result


$ be/src/runtime/coordinator.{h,cc}
  • Used by ImpalaServer for query execution
    • Called in QueryExecState#Exec
  • Use SimpleScheduler for data locality
    • Call GetHosts in Coordinator#Exec
  • ImpalaServer gets query result via Coordinator in distributed environment


$ fe/src/main/java/com/cloudera/impala/planner/
$ fe/src/main/java/com/cloudera/impala/planner/
  • UNPARTITIONED type is single-node fragment(Merge, Sort and others). And single-node fragment runs on the coordinator host
  • Not UNPARTITIONED fragments are ScanNode, Partitioned AggregationNode and others. Those fragments are executed on each host


$ be/src/sparrow/simple-scheduler.{h, cc} 
  • Data locality and round robin approaches
    • If data location hits Impalad running host, uses its host. Otherwise, use round robin

Cluster Management

$ be/src/sparrow/state-store.{h, cc} 
$ be/src/sparrow/subscription-manager.{h, cc} 
$ be/src/sparrow/failure-detector.{h, cc} 
  • Using State Store Server and Subscriber
    • StateStore uses heartbeat to detect failure
    • ImpalaServer and SimpleScheduler register own Subscriber
  • SubscriptionManager notifies Impalads using Subscriber callback
    • If disable state store, SimpleScheduler uses fixed backend, not dynamic


  • Impala Coordinator doesn't consider cluster load
  • Sparrow scheduler is simple and doesn't depend on Original Sparrow.
  • If a fragment execution failed, query itself failed. Please retry!

Omake: Sparrow

Probably, Impala Sparrow implementor is the one of Sparrow developers.


  • Distributed cluster scheduler by AMPLAB
  • For high throughput and low latency
    • compared to YARN, Mesos and others
    • Original implementation is java


  • Decentralized scheduler
    • No shared state about cluster load
    • instantaneous info from slaves for task distribution
    • Scheduler, Lanucher, Placer, Monitor
  • Support Algorithms
    • Round Robin, FIFO, Batch Sampling, etc...

This comment has been minimized.

Copy link
Owner Author

@repeatedly repeatedly commented Nov 28, 2012

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment