Skip to content

Instantly share code, notes, and snippets.

@repeatedly
Created November 26, 2012 10:13
Show Gist options
  • Star 9 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save repeatedly/4147497 to your computer and use it in GitHub Desktop.
Save repeatedly/4147497 to your computer and use it in GitHub Desktop.
Inside Impala Coordinator

http://hadoop-scr13th.eventbrite.com/

Impala

Impala flow

Impala Execution Flow

  1. User queries HiveQL to Impalad using Thrift API
  2. Frontend Planner generates Query Plan Tree using meta information
  3. Backend Coordinator sends an execution request to each query execution engine
  4. Backend Query execution engine executes a query fragment, e.g. HDFS Scan, Aggregation, Merge and etc

a. State Store notifies Impalad when cluster state changed

This presentaion focuses 3 and a steps.

Coordinator execution flow

  • ImpalaServer#query (Async API)

  • ImpalaServer#Execute with QueryExecState

  • ImpalaServer#ExecuteInternal

    • Call ImpalaServer#GetExecRequest for creating TExecRequest via FE
    • Register exec_state with query id
    • After call QueryExecState#Exec, stores query location for runninig hosts (map<Host, QueryId>)
  • QueryExecState#Exec

  • Coordinator#Exec

    • Call Coordinator#ComputeFragmentExecParams
      • Use Coordinator#ComputeFragmentHosts for computing host, which executes a fragment. This function considers data localtion(SimpleScheduler#GetHosts), input fragment locatinon(exchange node) and coodinator location(single node).
    • Call Coordinator#ComputeScanRangeAssignment
      • Generates vector<map<host, map<node_id, vector<scan_range_params>>>>. vector's index is fragment id.
    • Call PlanFragmentExecutor#Prepare(ask oza_x86 for more details).
    • Setup profiles and create BackendExecState for each fragment.
    • Execute Coordinator#ExecRemoteFragment using ParallelExecutor::Exec
      • ExecRemoteFragment call ImpaladServer#ExecPlanFragment using BE client
        • See ImpalaServer#ExecPlanFragment
      • Invoke a thread for each fragment
      • If a fragment execution failed, Coordinator caccels other fragments. No recovery.
  • SimpleScheduler#GetHosts

    • all running impalad listed in map<host, list<port>>. This function checks ScanNode location using this map for data locality.
    • State Store for sharing running Impalad via SubscriptionManager
  • ParallelExecutor#Exec

    • Execute multiple functions with thread in parallel

ImpalaServer

$ be/src/service/impala-servser.{h,cc}
  • Provide Async API
  • Construct a TExecRequest via FE
    • TExecRequest has Query Plan Tree
  • Store query id and its state
    • Fetch, Cancel and other APIs use those states

QueryExecState

  • One instance per request
    • Capture all states for request execution
  • Convert Coordinator’s result(RowBatch) to client result

Coordinator

$ be/src/runtime/coordinator.{h,cc}
  • Used by ImpalaServer for query execution
    • Called in QueryExecState#Exec
  • Use SimpleScheduler for data locality
    • Call GetHosts in Coordinator#Exec
  • ImpalaServer gets query result via Coordinator in distributed environment

PlanFragment

$ fe/src/main/java/com/cloudera/impala/planner/PlanFragment.java
$ fe/src/main/java/com/cloudera/impala/planner/Planner.java
  • UNPARTITIONED type is single-node fragment(Merge, Sort and others). And single-node fragment runs on the coordinator host
  • Not UNPARTITIONED fragments are ScanNode, Partitioned AggregationNode and others. Those fragments are executed on each host

SimpleScheduler

$ be/src/sparrow/simple-scheduler.{h, cc} 
  • Data locality and round robin approaches
    • If data location hits Impalad running host, uses its host. Otherwise, use round robin

Cluster Management

$ be/src/sparrow/state-store.{h, cc} 
$ be/src/sparrow/subscription-manager.{h, cc} 
$ be/src/sparrow/failure-detector.{h, cc} 
  • Using State Store Server and Subscriber
    • StateStore uses heartbeat to detect failure
    • ImpalaServer and SimpleScheduler register own Subscriber
  • SubscriptionManager notifies Impalads using Subscriber callback
    • If disable state store, SimpleScheduler uses fixed backend, not dynamic

Conclusion

  • Impala Coordinator doesn't consider cluster load
  • Sparrow scheduler is simple and doesn't depend on Original Sparrow.
  • If a fragment execution failed, query itself failed. Please retry!

Omake: Sparrow

Probably, Impala Sparrow implementor is the one of Sparrow developers.

Overview

  • Distributed cluster scheduler by AMPLAB
  • For high throughput and low latency
    • compared to YARN, Mesos and others
  • https://github.com/radlab/sparrow
    • Original implementation is java

Architecture

  • Decentralized scheduler
    • No shared state about cluster load
    • instantaneous info from slaves for task distribution
    • Scheduler, Lanucher, Placer, Monitor
  • Support Algorithms
    • Round Robin, FIFO, Batch Sampling, etc...
@repeatedly
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment