Skip to content

Instantly share code, notes, and snippets.

@shagunsodhani
Created January 18, 2016 06:42
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shagunsodhani/b0651ade0dc39aeb7cfd to your computer and use it in GitHub Desktop.
Save shagunsodhani/b0651ade0dc39aeb7cfd to your computer and use it in GitHub Desktop.
Notes on Hive Paper

Hive is an open-source data warehousing solution built on top of Hadoop. It supports an SQL-like query language called HiveQL. These queries are compiled into MapReduce jobs that are executed on Hadoop. While Hive uses Hadoop for execution of queries, it reduces the effort that goes into writing and maintaining MapReduce jobs.

Data Model

Hive supports database concepts like tables, columns, rows and partitions. Both primitive (integer, float, string) and complex data-types(map, list, struct) are supported. Moreover, these types can be composed to support structures of arbitrary complexity. The tables are serialized/deserialized using default serializers/deserializer. Any new data format and type can be supported by implementing SerDe and ObjectInspector java interface.

Query Language

Hive query language (HiveQL) consists of a subset of SQL along with some extensions. The language is very SQL-like and supports features like subqueries, joins, cartesian product, group by, aggregation, describe and more. MapReduce programs can also be used in Hive queries. A sample query using MapReduce would look like this:

FROM ( 
    MAP inputdata USING 'python mapper.py' AS (word, count)
    FROM inputtable
    CLUSTER BY word
    )
    REDUCE word, count USING 'python reduce.py';

This query uses mapper.py for transforming inputdata into (word, count) pair, distributes data to reducers by hashing on word column (given by CLUSTER) and uses reduce.py.

Notice that Hive allows the order of FROM and SELECT/MAP/REDUCE to be changed within a sub-query. This allows insertion of different transformation results into different tables/partitions/hdfs/local directory as part of the same query and reduces the number of scans on the input data.

Limitations

  • Only equi-joins are supported.
  • Data can not be inserted into existing table/partitions and all inserts overwrite the data.

INSERT INTO, UPDATE, and DELETE are not supported which makes it easier to handle reader and writer concurrency.

Data Storage

While a table is the logical data unit in Hive, the data is actually stored into hdfs directories. A table is stored as a directory in hdfs, partition of a table as a subdirectory within a directory and bucket as a file within the table/partition directory. Partitions can be created either when creating tables or by using INSERT/ALTER statement. The partitioning information is used to prune data when running queries. For example, suppose we create partition for day=monday using the query

ALTER TABLE dummytable ADD PARTITION (day='monday')

Next, we run the query -

SELECT * FROM dummytable WHERE day='monday'

Suppose the data in dummytable is stored in /user/hive/data/dummytable directory. This query will only scan the subdirectory /user/hive/data/dummytable/day=monday within the dummytable directory.

A bucket is a file within the leaf directory of a table or a partition. It can be used to prune data when the user runs a SAMPLE query.

Any data stored in hdfs can be queried using the EXTERNAL TABLE clause by specifying its location with the LOCATION clause. When dropping an external table, only its metadata is deleted and not the data itself.

Serialization/Deserialization

Hive implements the LazySerDe as the default SerDe. It deserializes rows into internal objects lazily so that the cost of Deserialization of a column is incurred only when it is needed. Hive also provides a RegexSerDe which allows the use of regular expressions to parse columns out from a row. Hive also supports various formats like TextInputFormat, SequenceFileInputFormat and RCFileInputFormat. Other formats can also be implemented and specified in the query itself. For example,

CREATE TABLE dummytable(key INT, value STRING)
    STORED AS 
        INPUTFORMAT
            org.apache.hadoop.mapred.SequenceFileInputFormat
        OUTPUTFORMAT
            org.apache.hadoop.mapred.SequenceFileOutputFormat

System Architecture and Component

Components

  • Metastore - Stores system catalog and metadata about tables, columns and partitions.
  • Driver - Manages life cycle of a HiveQL statement as it moves through Hive.
  • Query Compiler - Compiles query into a directed acyclic graph of MapReduce tasks.
  • Execution Engine - Execute tasks produced by the compiler in proper dependency order.
  • Hive Server - Provides a thrift interface and a JDBC/ODBC server.
  • Client components - CLI, web UI, and JDBC/ODBC driver.
  • Extensibility Interfaces - Interfaces for SerDe, ObjectInspector, UDF (User Defined Function) and UDAF (User-Defined Aggregate Function).

Life Cycle of a query

The query is submitted via CLI/web UI/any other interface. This query goes to the compiler and undergoes parse, type-check and semantic analysis phases using the metadata from Metastore. The compiler generates a logical plan which is optimized by the rule-based optimizer and an optimized plan in the form of DAG of MapReduce and hdfs tasks is generated. The execution engine executes these tasks in the correct order using Hadoop.

Metastore

It stores all information about the tables, their partitions, schemas, columns and their types, etc. Metastore runs on traditional RDBMS (so that latency for metadata query is very small) and uses an open source ORM layer called DataNuclues. Matastore is backed up regularly. To make sure that the system scales with the number of queries, no metadata queries are made the mapper/reducer of a job. Any metadata needed by the mapper or the reducer is passed through XML plan files that are generated by the compiler.

Query Compiler

Hive Query Compiler works similar to traditional database compilers.

  • Antlr is used to generate the Abstract Syntax Tree (AST) of the query.
  • A logical plan is created using information from the metastore. An intermediate representation called query block (QB) tree is used when transforming AST to operator DAG. Nested queries define the parent-child relationship in QB tree.
  • Optimization logic consists of a chain of transformation operations such that output from one operation is input to next operation. Each transformation comprises of a walk on operator DAG. Each visited node in the DAG is tested for different rules. If any rule is satisfied, its corresponding processor is invoked. Dispatcher maintains a mapping fo different rules and their processors and does rule matching. GraphWalker manages the overall traversal process.
  • Logical plan generated in the previous step is split into multiple MapReduce and hdfs tasks. Nodes in the plan correspond to physical operators and edges represent the flow of data between operators.

Optimisations

  • Column Pruning - Only the columns needed in the query processing are projected.
  • Predicate Pushdown - Predicates are pushed down to the scan so that rows are filtered as early as possible.
  • Partition Pruning - Predicates on partitioned columns are used to prune out files of partitions that do not satisfy the predicate.
  • Map Side Joins - In case the tables involved in the join are very small, the tables are replicated in all the mappers and the reducers.
  • Join Reordering - Large tables are streamed and not materialized in-memory in the reducer to reduce memory requirements.

Some optimizations are not enabled by default but can be activated by setting certain flags. These include:

  • Repartitioning data to handle skew in GROUP BY processing.This is achieved by performing GROUP BY in two MapReduce stages - first where data is distributed randomly to the reducers and partial aggregation is performed. In the second stage, these partial aggregations are distributed on GROUP BY columns to different reducers.
  • Hash bases partial aggregations in the mappers to reduce the data that is sent by the mappers to the reducers which help in reducing the amount of time spent in sorting and merging the resulting data.

Execution Engine

Execution Engine executes the tasks in order of their dependencies. A MapReduce task first serializes its part of the plan into a plan.xml file. This file is then added to the job cache and mappers and reducers are spawned to execute relevant sections of the operator DAG. The final results are stored to a temporary location and then moved to the final destination (in the case of say INSERT INTO query).

Future Work

The paper mentions the following areas for improvements:

  • HiveQL should subsume SQL syntax.
  • Implementing a cost-based optimizer and using adaptive optimization techniques.
  • Columnar storage to improve scan performance.
  • Enhancing JDBS/ODBC drivers for Hive to integrate with other BI tools.
  • Multi-query optimization and generic n-way join in a single MapReduce job.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment