Skip to content

Instantly share code, notes, and snippets.

@thanoojgithub
Last active October 28, 2023 11:52
Show Gist options
  • Save thanoojgithub/04053567d78ea3bbbf2e7795be2e9633 to your computer and use it in GitHub Desktop.
Save thanoojgithub/04053567d78ea3bbbf2e7795be2e9633 to your computer and use it in GitHub Desktop.
hive query optimization techniques
https://github.com/Thomas-George-T/Movies-Analytics-in-Spark-and-Scala
Change execution engine = Tez, spark ( set Tez/Spark client jars into HADOOP_CLASSPATH)
Partitioning - PARTITIONED BY clause is used to divide the table into buckets.
Buckting - CLUSTERED BY clause is used to divide the table into buckets.
Map-Side join, Bucket-Map-Side join, Sorted Bucket-Map-Side join
Usage of suitable file format = ORC(Optimized Row Columnar) file formate
Indexing
Vectorization along with ORC
CBO
Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine
(i.e. spark, tez)
hive -f hive-sample.hql –hivevar database=mydb
SET hive.execution.engine=tez;
SET hive.execution.engine=spark;
Partitioning
-------------
static ::
FROM page_view_stg pvs
INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country='US')
SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null, pvs.ip WHERE pvs.country = 'US';
Dynamic :: strict
FROM page_view_stg pvs
INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country)
SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null, pvs.ip, pvs.country
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
FROM page_view_stg pvs
INSERT OVERWRITE TABLE page_view PARTITION(dt, country)
SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null, pvs.ip, from_unixtimestamp(pvs.viewTime, 'yyyy-MM-dd') ds, pvs.country;
CREATE TABLE page_view(viewTime INT, userid BIGINT,
page_url STRING, referrer_url STRING,
ip STRING COMMENT 'IP Address of the User')
COMMENT 'This is the page view table'
PARTITIONED BY(dt STRING, country STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
CREATE EXTERNAL TABLE page_view_stg(viewTime INT, userid BIGINT,
page_url STRING, referrer_url STRING,
ip STRING COMMENT 'IP Address of the User',
country STRING COMMENT 'country of origination')
COMMENT 'This is the staging page view table'
ROW FORMAT DELIMITED FIELDS TERMINATED BY '44' LINES TERMINATED BY '12'
STORED AS TEXTFILE
LOCATION '/user/data/staging/page_view';
hadoop dfs -put /tmp/pv_2008-06-08.txt /user/data/staging/page_view
Buckting
-----------
Bucketing can also be done even without partitioning on Hive tables.
Bucketing => based on Hash function on bucketing column
Sampling ::
SELECT * FROM source TABLESAMPLE(BUCKET 3 OUT OF 32 ON rand()) s;
Input pruning: Typically, TABLESAMPLE will scan the entire table and fetch the sample. But, that is not very efficient.
Instead, the table can be created with a CLUSTERED BY clause which indicates the set of columns on which the table is hash-partitioned/clustered on. If the columns specified in the TABLESAMPLE clause match the columns in the CLUSTERED BY clause, TABLESAMPLE scans only the required hash-partitions of the table.
SELECT * FROM source TABLESAMPLE(0.1 PERCENT) s;
set hive.enforce.bucketing = true
property sets the number of reduce tasks == number of buckets mentioned in the table definition
create table buck_emp(
id int,
name string,
salary int)
CLUSTERED BY (id)
SORTED BY (id)
INTO 4 BUCKETS;
We need to use regular INSERT statement to insert into bucketed table.
INSERT OVERWRITE TABLE buck_emp
SELECT * FROM emp;
Map-Side Join ::
---------------------
join two tables - one of the join table having file size < 25 MB,
local map reduce task - ???
read small file and put it into the memory as hash table as stored as serializable file -> hadoop Distributed Cache ->
when the actual MR job runs -> sharing hash table file into a mapper nodes
CREATE TABLE IF NOT EXISTS dataset1 ( eid int, first_name String, last_name String, email String, gender String, ip_address String) row format delimited fields terminated BY ',' tblproperties("skip.header.line.count"="1");
CREATE TABLE IF NOT EXISTS dataset2 ( eid int, first_name String, last_name String) row format delimited fields terminated BY ',' tblproperties("skip.header.line.count"="1");
1. By specifying the keyword hint, /*+ MAPJOIN(b) */ in the join statement.
2. By setting the following property to true.
set hive.auto.convert.join=true
set hive.mapjoin.smalltable.filesize=(default it will be 25MB)
SELECT /*+ MAPJOIN(dataset2) */ dataset1.first_name, dataset1.eid,dataset2.eid FROM dataset1 JOIN dataset2 ON dataset1.first_name = dataset2.first_name;
SELECT /*+ MAPJOIN(dataset2) */ dataset1.first_name, dataset1.eid,dataset2.eid FROM dataset1 JOIN dataset2 ON dataset1.first_name = dataset2.first_name;
As it is a Map-side join, the number of reducers will be set to 0 automatically.
Bucket-Map join ::
If tables being joined are bucketed on the join columns, and the number of buckets in one table is a multiple of the number of buckets in the other table, the buckets can be joined with each other.
CREATE TABLE IF NOT EXISTS dataset1_bucketed ( eid int,first_name String, last_name String, email String, gender String, ip_address String) clustered by(first_name) into 4 buckets row format delimited fields terminated BY ',';
CREATE TABLE IF NOT EXISTS dataset2_bucketed (eid int,first_name String, last_name String) clustered by(first_name) into 8 buckets row format delimited fields terminated BY ',' ;
insert into dataset1_bucketed select * from dataset1;
insert into dataset2_bucketed select * from dataset2;
set hive.optimize.bucketmapjoin = true
SELECT /*+ MAPJOIN(dataset2_bucketed) */ dataset1_bucketed.first_name,dataset1_bucketed.eid, dataset2_bucketed.eid FROM dataset1_bucketed JOIN dataset2_bucketed ON dataset1_bucketed.first_name = dataset2_bucketed.first_name ;
Sort Merge Bucket(SMB) Map Join ::
If the tables being joined are sorted and bucketized on the join columns and have the same number of buckets, a sort-merge join can be performed. The corresponding buckets are joined with each other at the mapper.
Set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
CREATE TABLE IF NOT EXISTS dataset2_bucketed1 (eid int,first_name String, last_name String) clustered by(first_name) into 4 buckets row format delimited fields terminated BY ',' ;
insert overwrite table dataset2_bucketed1 select * from dataset2 sort by first_name;
Now, we have two tables with 4 buckets and the joined column sorted. Let us perform the join query again.
SELECT /*+ MAPJOIN(dataset2_sbucketed1) */ dataset1_bucketed.first_name, dataset1_bucketed.eid, dataset2_bucketed1.eid FROM dataset1_bucketed JOIN dataset2_bucketed1 ON dataset1_bucketed.first_name = dataset2_bucketed1.first_name ;
hive>set hive.enforce.sortmergebucketmapjoin=false;
hive>set hive.auto.convert.sortmerge.join=true;
hive>set hive.optimize.bucketmapjoin = true;
hive>set hive.optimize.bucketmapjoin.sortedmerge = true;
hive>set hive.auto.convert.join=false; // if we do not do this, automatically Map-Side Join will happen
SELECT u.name,u.salary FROM buck_dept d INNER JOIN buck_emp u ON d.id = u.id;
File Format:
TEXTFILE ::
supports - CSV (Comma Separated Values), delimited by Tabs, Spaces, and JSON data.
By default, if we use TEXTFILE format then each line is considered as a record.
org.apache.hadoop.mapred.TextInputFormat
org.apache.hadoop.mapred.TextOutputFormat
create table olympic(athelete STRING,age INT,country STRING,year STRING,closing STRING,sport STRING,gold INT,silver INT,bronze INT,total INT) row format delimited fields terminated by '\t' stored as textfile;
org.apache.hadoop.mapred.SequenceFileInputFormat
org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat
create table olympic_sequencefile(athelete STRING,age INT,country STRING,year STRING,closing STRING,sport STRING,gold INT,silver INT,bronze INT,total INT) row format delimited fields terminated by '\t' stored as sequencefile;
ORCFILE:
ORC stands for Optimized Row Columnar which means it can store data in an optimized way than the other file formats. ORC reduces the size of the original data up to 75%(eg: 100GB file will become 25GB).
org.apache.hadoop.hive.ql.io.orc
create table olympic_orcfile(athelete STRING,age INT,country STRING,year STRING,closing STRING,sport STRING,gold INT,silver INT,bronze INT,total INT) row format delimited fields terminated by '\t' stored as orcfile;
ORC contains indexes on file level, stripe level and row level (for 10000 rows, configurable).
These indexes are useful for filtering sortable sequential values and range queries.
For indexes to be efficient you should sort data by index keys when inserting. Unsorted index is not efficient because all stripes can contain all keys.
Sorting during insert can be expensive.
For sortable sequential values like integer id, min/max values stored in ORC indexes (sorted) are better.
Bloom filters are structures which can help to check if key is not present in the dataset with 100 percent probability.
Bloom filters efficient for equality queries, especially for not sequential unsorted values like GUIDs.
By default you should use a bloom filter instead of a Bitmap Index or a Compact Index if you have select queries using where together with the = operator or when (equi-)joining large tables.
Bitmap Indexes can be replaced by a bloom filter
Compact Indexes can be replaced by a bloom filter and storage indexes
CREATE TABLE CUSTOMER (
customerId int,
gender tinyint,
age tinyint,
revenue decimal(10,2),
name varchar(100),
customerCategory int)
STORED AS ORC
TBLPROPERTIES
(orc.compress'='SNAPPY',
‚orc.create.index'='true'
‚orc.bloom.filter.columns'='gender',
‚orc.bloom.filter.fpp'='0.05',
‚orc.stripe.size'='268435456',
‚orc.row.index.stride'='10000');
Notes:
If your data is delimited by some parameters then you can use TEXTFILE format.
If your data is in small files whose size is less than the block size then you can use SEQUENCEFILE format.
If you want to perform analytics on your data and you want to store your data efficiently for that then you can use RCFILE format.
If you want to store your data in an optimized way which lessens your storage and increases your performance then you can use ORCFILE format.
AVRO vs PARQUET vs ORC
Avro is a row-based storage/serialization binary format
Avro stores the schema in JSON format (readable)
Avro supports schema evolution. Avro handles schema changes like missing fields, added fields and changed fields.
ORC - Optimized Row Columnar file
- as it stores data in columner format, which leads to effective compression
- a single file as the output of each task, which reduces the NameNodes load
- light-weight indexes stored within the file
- ORC - hortonworks (Hive usually bundle with hortonworks) - why ORC better performances with Hive ?? - Vectorized ORC reader
- ORC defines 2 things - OrcInputformat & OrcSerde
ORC file contains 3 components:
1 Stripe - An ORC file contains groups of row data called stripes
2 File footer
- list of stripes locations.
- number of rows per stripes
- each column's data type
- column level aggregations - count, min,max and sum, etc
3 postscript - compression parameters
tbleproperties("orc.compress"="NONE")
set hive,exec.orc.default.compress=NONE;
Stripe Structure
1. Index Data - offsets to jump to start of row group
2. Row Data - 10,000
3. Stripe Footer
default stripe size - 256 MB
concurrent reads of the same file using separate RecordReaders
bloom filter
ORC supports ACID properties.
CREATE TABLE ... STORED AS ORC
ALTER TABLE ... [PARTITION partition_spec] SET FILEFORMAT ORC
SET hive.default.fileformat=Orc
SET hive.exec.orc.default.compress=ZLIB
AVRO vs PARQUET
AVRO is a row-based storage format whereas PARQUET is a columnar based storage format.
Write operations in AVRO are better than in PARQUET
PARQUET is much better for analytical querying i.e. reads and querying are much more efficient than writing.
schema evaluation - PARQUET only supports schema append whereas AVRO supports a much-featured schema evolution i.e. adding or modifying columns.
PARQUET is ideal for querying a subset of columns in a multi-column table. AVRO is ideal in case of ETL operations where we need to query all the columns.
ORC vs PARQUET
Both are columner format
But,
Parquet might be better if you have highly nested data, because it stores its elements as a tree-like structure.
Apache ORC might be better if your file-structure is flattened.
Spark performs best with parquet, hive performs best with ORC. why ???
Hive has a vectorized ORC reader but no vectorized parquet reader.
Spark has a vectorized parquet reader and no vectorized ORC reader.
Vectorization means that rows are decoded in batches, dramatically improving memory locality and cache utilization.
Hive 2.3.0 has vectorized Parquet reader - issues.apache.org/jira/browse/HIVE-14815
Since Spark 2.3, Spark supports a vectorized ORC reader spark.apache.org/docs/latest/sql-data-sources-orc.html
Spark 2.3.0, spark does have a vectorized ORC reader: issues.apache.org/jira/browse/SPARK-16060
Vectorized Query Execution :
A standard query execution system processes one row at a time.
A single row of data goes through all the operators before the next row can be processed. This mode of processing is very inefficient in terms of CPU usage. The inner loop of execution scans these vectors very fast, avoiding method calls, deserialization, unnecessary if-then-else, etc.
Vectorized query execution streamlines operations by processing a block of 1024 rows at a time. Within the block, each column is stored as a vector (an array of a primitive data type).
set hive.vectorized.execution.enabled = true; (Vectorized execution is off by default)
class VectorizedRowBatch {
boolean selecetedInUse;
int [] selected;
int size;
ColumnVector [] columns;
}
class LongColumnVector extends ColumnVector {
long [] vector;
}
Note:
In general,
If the data is wide, has a large number of attributes and is write-heavy, then a row-based approach may be best. - AVRO
but add indexes that provide better read performance.
If the data is narrower, has a fewer number of attributes, and is read-heavy, then a column-based approach may be best.
Indexing:
CREATE INDEX index_name
ON TABLE table_name (columns,....)
AS 'org.apache.hadoop.hive.ql.index.compact.CompactIndexHandler'
WITH DEFERRED REBUILD;
- we are creating a compact index for the table.
CREATE INDEX olympic_index_bitmap
ON TABLE olympic (age)
AS 'BITMAP'
WITH DEFERRED REBUILD;
ALTER INDEX olympic_index_bitmap on olympic REBUILD;
Indexes are advised to build on the columns on which you frequently perform operations.
Building more number of indexes also degrade the performance of your query.
Indexing has been removed in version 3.0
Materialized views with automatic rewriting can result in very similar results.
Hive 2.3.0 adds support for materialzed views.
Using columnar file formats (Parquet, ORC) – they can do selective scanning; they may even skip entire files/blocks.
- The goal of Hive indexing is to improve the speed of query lookup on certain columns of a table. Without an index, queries with predicates like 'WHERE tab1.col1 = 10' load the entire table or partition and process all the rows.
- But if an index exists for col1, then only a portion of the file needs to be loaded and processed.
- The improvement in query speed that an index can provide comes at the cost of additional processing to create the index and disk space to store the index.
Hive indexing was added in version 0.7.0, and bitmap indexing was added in version 0.8.0.
CREATE INDEX table01_index ON TABLE table01 (column2) AS 'COMPACT';
SHOW INDEX ON table01;
DROP INDEX table01_index ON table01;
CREATE INDEX table05_index ON TABLE table05 (column6) AS 'COMPACT' STORED AS ORCFILE;
CREATE INDEX table03_index ON TABLE table03 (column4) AS 'BITMAP'
-Compact indexing stores the pair of indexed column’s value and its block id.
-Bitmap indexing stores the combination of indexed column value and list of rows as a bitmap.
COMPACT:
<ayodhya,block-1>
<ayodhya,block-1>
<midhila,block-2>
BITMAP
<ayodhya, [(1,ram,ayodhya),(2,laki,ayodhya)]>
<midhila, [3,laki,midhila]>
View:: View is just a named query. It doesn't store anything. When there is a query on view, it runs the query of the view definition. Actual data comes from table.
Materialised Views:: Stores data physically and get updated periodically. While querying Materialised Views, it gives data from Materialised Views.
CREATE TABLE emps (
empid INT,
deptno INT,
name VARCHAR(256),
salary FLOAT,
hire_date TIMESTAMP)
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
CREATE TABLE depts (
deptno INT,
deptname VARCHAR(256),
locationid INT)
STORED AS ORC
TBLPROPERTIES ('transactional'='true');
CREATE MATERIALIZED VIEW mv1
AS
SELECT empid, deptname, hire_date
FROM emps JOIN depts
ON (emps.deptno = depts.deptno)
WHERE hire_date >= '2016-01-01';
CBO::
Hive Cost-Based Optimizer (CBO) is a core component in Hive query processing engine. Powered by Apache Calcite, the CBO optimizes and calculates the cost of various plans for a query.
CBO is to generate efficient execution plans by examining the tables and conditions specified in the query.
After parsing, a query gets converted to a logical tree (Abstract Syntax Tree) that represents the operations that the query must perform, such as reading a particular table or performing an inner JOIN.
org.apache.hadoop.hive.ql.ppd.PredicatePushDown
set hive.cbo.enable=true (Enables cost-based query optimization.)
set hive.stats.autogather=true
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
EXPLAIN :: - spark SQL or hive
hive> explain select * from people;
spark.sql("select * from people").explain()
ANALYZE TABLE
ANALYZE TABLE CUSTOMER PARTITION(customerCategory) COMPUTE STATISTICS
ANALYZE TABLE CUSTOMER PARTITION(customerCategory) COMPUTE STATISTICS FOR COLUMNS name
ANALYZE TABLE CUSTOMER CACHE METADATA
Skew Join - not recommended
Aggregate Index
SELECT gender, count(gender) FROM CUSTOMER GROUP BY gender;
CREATE INDEX idx_GENDER ON TABLE CUSTOMER(GENDER) AS 'org.apache.hadoop.hive.ql.index.AggregateIndexHandler'
WITH DEFERRED REBUILD
IDXPROPERTIES('AGGREGATES'='count(gender)')
STORED AS ORC TBLPROPERTIES
('orc.compress'='SNAPPY',
'orc.create.index'='true');
Denormalization::
Think about pre-joining tables into a flat table. Storage indexes and bloom filters work efficiently for flat tables instead of joining every time when needed. This is a paradigm shift from traditional databases.
- Understand how the Yarn Scheduler (Fair or Capacity) works for optimal concurrency and high performance queries
- hive.execution.engine: You should set it to TEZ > 0.8 for most of your scenarios. Alternatively you may set Spark, but this will mean some optimizations do not work.
Prewarm TEZ containers.
- prefer bulk insert using CREATE-TABLE-AS-SELECT
- Use the Mapside join hint to load (small) dimension tables in-memory
- Increase replication factor of selected tables,partitions,buckets,indexes: The more copies you have on different nodes the more performance you gain with the disadvantage of wasting more space. You can use the hadoop dfs -setrep -R -w X /path/to/hive/tableinhdfs (X is the replication count, 3 is the default)
- Use Partitions and sorted bucking on columns for where condition and join on clause
- Use compression: Snappy is fast but zlib compress better
for faster/frequent reads - snappy
for older data in different partitions you may use zlib instead of Snappy, because they are accessed less often.
---------------------------------------------------------------------------------------------------------------------------------------------
HIVE DML::
-------------
Loading files into tables:
LOAD DATA [LOCAL] INPATH 'people' [OVERWRITE] INTO TABLE people PARTITION (year='2020', month='12')
path - hdfs://namenode:9000/user/hive/project/people (file or directory or addresses a set of files)
CREATE TABLE tab1 (col1 int, col2 int) PARTITIONED BY (col3 int) STORED AS ORC;
LOAD DATA [LOCAL] INPATH 'filepath' INTO TABLE tab1;
CREATE TABLE tab1 (col1 int, col2 int) STORED AS ORC as select_statement FROM from_statement;
Note: creating table will automatically match to column names and type of selected table
INSERT INTO TABLE tablename PARTITION (partcol1[=val1], partcol2[=val2] ...) select_statement FROM from_statement;
INSERT OVERWRITE will overwrite any existing data in the table or partition
unless IF NOT EXISTS is provided for a partition (as of Hive 0.9.0)
INSERT INTO will append to the table or partition, keeping the existing data intact. from hive 0.8.0
As of Hive 0.13.0, a table can be made immutable by creating it with TBLPROPERTIES ("immutable"="true"). The default is "immutable"="false".
INSERT INTO behavior into an immutable table is disallowed if any data is already present, although INSERT INTO still works if the immutable table is empty.
The behavior of INSERT OVERWRITE is not affected by the "immutable" table property.
As of Hive 1.1.0 the TABLE keyword is optional.
As of Hive 1.2.0 each INSERT INTO T can take a column list like INSERT INTO T (z, x, c1).
Dynamic Partition Inserts:
hive.exec.dynamic.partition=true
Needs to be set to true to enable dynamic partition inserts
hive.exec.dynamic.partition.mode=strict
In strict mode, the user must specify at least one static partition in case the user accidentally overwrites all partitions, in nonstrict mode all partitions are allowed to be dynamic
hive.exec.max.dynamic.partitions.pernode=100
Maximum number of dynamic partitions allowed to be created in each mapper/reducer node
FROM page_view_stg pvs
INSERT OVERWRITE TABLE page_view PARTITION(dt='2008-06-08', country)
SELECT pvs.viewTime, pvs.userid, pvs.page_url, pvs.referrer_url, null, null, pvs.ip, pvs.cnt
Here the country partition will be dynamically created by the last column from the SELECT clause (i.e. pvs.cnt).
CREATE TABLE students (name VARCHAR(64), age INT, gpa DECIMAL(3, 2)) CLUSTERED BY (age) INTO 2 BUCKETS STORED AS ORC;
CREATE TABLE pageviews (userid VARCHAR(64), link STRING, came_from STRING) PARTITIONED BY (datestamp STRING) CLUSTERED BY (userid) INTO 256 BUCKETS STORED AS ORC;
INSERT INTO TABLE pageviews PARTITION (datestamp = '2014-09-23') VALUES ('jsmith', 'mail.com', 'sports.com'), ('jdoe', 'mail.com', 'gmail.com');
INSERT INTO TABLE pageviews PARTITION (datestamp) VALUES ('tjohnson', 'sports.com', 'finance.com', '2014-09-23'), ('tlee', 'finance.com', null, '2014-09-21');
UPDATE is available starting in Hive 0.14.
DELETE is available starting in Hive 0.14.
Deletes can only be performed on tables that support ACID.
Conditional functions such as IF, CASE, COALESCE, NVL, DECODE
if
select IF(1=1,'TRUE','FALSE') as IF_CONDITION_TEST;
isnull( a )
select isnull( NULL );
isnotnull ( a )
select isnotnull( NULL );
CASE WHEN a THEN b [WHEN c THEN d] … [ELSE e] END
select course, dept,
case when sum(case when status in ( 'fail','detained') then 1 else 0 end) > 0 then 'fail'
when sum(case when status in ('absent') then 1 else 0 end) > 0 then 'absent'
when sum(case when status in ('pass') then 1 else 0 end) > 0 then 'pass'
else 'no_result'
end as final_status
from college
group by course,dept;
select case x
when 1 then 'one'
when 2 then 'two'
when 0 then 'zero'
else 'out of range'
end from t1;
NVL(arg1, arg2)
This function will test if expression is null, it’ll return expression if result is not null otherwise second argument is returned.
select nvl(null,'value is null');
coalesce(value1, value2)
Returns the first non-null value for list of values provided as arguments.
select coalesce(null,'a',null,'b');
DECODE
SELECT event, decode(day_of_week, 1, "Monday", 2, "Tuesday", 3, "Wednesday", 4, "Thursday", 5, "Friday", 6, "Saturday", 7, "Sunday", "Unknown day") FROM calendar;
All Hive keywords are case-insensitive, including the names of Hive operators and functions.
SHOW FUNCTIONS;
hive> DESCRIBE FUNCTION year;
OK
year(param) - Returns the year component of the date/timestamp/interval
Time taken: 0.029 seconds, Fetched: 1 row(s)
hive> DESCRIBE FUNCTION EXTENDED year;
OK
year(param) - Returns the year component of the date/timestamp/interval
param can be one of:
1. A string in the format of 'yyyy-MM-dd HH:mm:ss' or 'yyyy-MM-dd'.
2. A date value
3. A timestamp value
4. A year-month interval valueExample:
> SELECT year('2009-07-30') FROM src LIMIT 1;
2009
Function class:org.apache.hadoop.hive.ql.udf.UDFYear
Function type:BUILTIN
Time taken: 0.032 seconds, Fetched: 10 row(s)
hive> DESCRIBE FUNCTION EXTENDED version;
OK
version() - Returns the Hive build version string - includes base version and revision.
Function class:org.apache.hadoop.hive.ql.udf.UDFVersion
Function type:BUILTIN
Time taken: 0.026 seconds, Fetched: 3 row(s)
hive> select key || value from mydb.src;
A <=> B
Returns same result with EQUAL(=) operator for non-null operands, but returns TRUE if both are NULL, FALSE if one of the them is NULL.
A <> B or A != B
NULL if A or B is NULL, TRUE if expression A is NOT equal to expression B, otherwise FALSE.
A RLIKE B
A REGEXP B
'foobar' RLIKE 'foo' evaluates to TRUE and so does 'foobar' RLIKE '^f.*r$'
cast('1' as BIGINT)
hive> select year('2020-12-12');
OK
2020
hive> select month('2020-12-12');
OK
12
hive> select day('2020-12-12');
OK
12
hive> select extract(month from "2016-10-20");
10
hive> select current_date;
OK
2020-12-16
hive> select current_timestamp;
OK
2020-12-16 15:35:46.17
hive> select last_day(current_date);
OK
2020-12-31
hive> select next_day('2015-01-14', 'TU');
OK
2015-01-20
hive> select explode(array('A','B','C'));
OK
A
B
C
hive> select explode(map('A',10,'B',20,'C',30)) as (key,value);
OK
A 10
B 20
C 30
package com.example.hive.udf;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;
public final class Lower extends UDF {
public Text evaluate(final Text s) {
if (s == null) { return null; }
return new Text(s.toString().toLowerCase());
}
}
Note: evaluate() method can be overloaded
Deploying Jars for User Defined Functions and User Defined SerDes
hive> add jar my_jar.jar;
Added my_jar.jar to class path
(By default, it will look in the current directory. )
hive> add jar /tmp/my_jar.jar;
Added /tmp/my_jar.jar to class path
hive> list jars;
my_jar.jar
create temporary function my_lower as 'com.example.hive.udf.Lower';
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DML
----------------------------------------------------------------------------------------
Speculative Execution in Hadoop
Firstly all the tasks for the job are launched in Hadoop MapReduce. The speculative tasks are launched for those tasks that have been running for some time (at least one minute) and have not made any much progress, on average, as compared with other tasks from the job.
The speculative task is killed if the original task completes before the speculative task, on the other hand, the original task is killed if the speculative task finishes before it.
Prons:
beneficial in some cases because in a Hadoop cluster with 100s of nodes, problems like hardware failure or network congestion are common and running parallel or duplicate task would be better.
mapred-site.xml
mapred.map.tasks.speculative.execution=true (default)
mapred.reduce.tasks.speculative.execution=true (default)
Cons:
Due to this, Clustering efficiency is affected due to duplicate tasks. Since in speculative execution redundant tasks are being executed, thus this can reduce overall throughput.
We can disable those two props
---------------------------------------------------------------------------------------------
FROM & JOINs determine & filter rows
WHERE more filters on the rows
GROUP BY combines those rows into groups
HAVING filters groups
ORDER BY arranges the remaining rows/groups
LIMIT filters on the remaining rows/groups
1. FROM
2. ON
3. JOIN
4. WHERE
5. GROUP BY
6. WITH CUBE or WITH ROLLUP
7. HAVING
8. SELECT
9. DISTINCT
10. ORDER BY
11. TOP
SELECT DISTINCT column, AGG_FUNC(column_or_expression), …
FROM mytable
JOIN another_table
ON mytable.column = another_table.column
WHERE constraint_expression
GROUP BY column
HAVING constraint_expression
ORDER BY column ASC/DESC
LIMIT count OFFSET COUNT;
https://jaceklaskowski.gitbooks.io/mastering-spark-sql/content/spark-sql-QueryExecution.html
Defining SQL order of execution
1. from
2. where
3. group by
4. having
5. select
6. order by
7. limit
---------------------------------------------------------------------------------------------
Difference between Sort By and Order By
ORDER BY syntax in Hive QL is similar to the syntax of ORDER BY in SQL language.
There are some limitations in the "order by" clause:
- In the strict mode (i.e., hive.mapred.mode=strict), the order by clause has to be followed by a "limit" clause.
- The limit clause is not necessary if you set hive.mapred.mode to nonstrict.
reason, ORDER BY impose total order of all results
there has to be one reducer to sort the final output.
If the number of rows in the output is too large, the single reducer could take a very long time to finish.
Hive supports SORT BY which sorts the data per reducer, which sort the rows before feeding the rows to a reducer.
ORDER BY guarantees total order in the output while SORT BY only guarantees ordering of the rows within a reducer.
If there are more than one reducer, "sort by" may give partially ordered final results.
that is why, we can use CLUSTER BY along with SORT BY combinedly as : CLUSTER BY <colName> SORT BY <anotherColName>
CLUSTER BY (Bucketing) internally partition the data according to given column value and each bucketed data gets assigned to a reducer
here, SORT BY applies sorting per reducer
So, we can use CLUSTER BY along with SORT BY combinedly
Note:
- In Hive 2.1.0 and later,
The default null sorting order for ASC order is NULLS FIRST, while the default null sorting order for DESC order is NULLS LAST.
- In strict mode, some risky queries are not allowed to run. For example, full table scans are prevented (see HIVE-10454) and ORDER BY requires a LIMIT clause.
--------------------------------------------------------------------------------------------------
Establishing an SFTP connection
-------------------------------
$ sftp remote_username@server_ip_or_hostname
Connected to remote_username@server_ip_or_hostname.
sftp> pwd
Remote working directory: /home/remote_username
sftp> cd lpwd
Local working directory: /home/local_username
sftp> get filename.zip
Fetching /home/remote_username/filename.zip to filename.zip
/home/remote_username/filename.zip 100% 24MB 1.8MB/s 00:13
Uploading Files with the SFTP Command
sftp> put filename.zip
Uploading filename.zip to /home/remote_username/filename.zip
filename.zip 100% 12MB 1.7MB/s 00:06
sftp> df
sftp> mkdir directory_name
sftp> rename file_name new_file_name
sftp> rm file_name
sftp> rmdir directory_name
sftp> chmod 644 file_name
---------------------------------------------------------------------------------------------------
huser@thanoojWin10Home:~/data/hive/countries$ hdfs dfs -put ~/data/hive/countries/ /huser/data/
huser@thanoojWin10Home:~/data/hive/countries$ hdfs dfs -ls /huser/data/countries
Found 1 items
-rw-r--r-- 1 huser supergroup 162 2022-03-28 13:33 /huser/data/countries/countries.csv
huser@thanoojWin10Home:~/data/hive/countries$ hdfs dfs -cat /huser/data/countries/countries.csv
1,USA,Washington,328,America
2,France,Paris,67,Europe
3,Spain,Madrid,47,Europe
4,Russia,Moscow,145,Europe
5,Indonesia,Jakarta,267,Asia
6,Nigeria,Abuja,196,Africa
huser@thanoojWin10Home:~/data/hive/countries$
hive> CREATE EXTERNAL TABLE IF NOT EXISTS countries (ID int, name string, capital string, population bigint, continent string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION '/huser/data/countries';
OK
Time taken: 1.022 seconds
hive> select * from countries;
OK
1 USA Washington 328 America
2 France Paris 67 Europe
3 Spain Madrid 47 Europe
4 Russia Moscow 145 Europe
5 Indonesia Jakarta 267 Asia
6 Nigeria Abuja 196 Africa
Time taken: 3.142 seconds, Fetched: 6 row(s)
hive> select c.*, row_number () over (partitioned by continent order by name) as rm from default.countries_3;
OK
+-------+------------+-------------+---------------+--------------+-----+
| c.id | c.name | c.capital | c.population | c.continent | rm |
+-------+------------+-------------+---------------+--------------+-----+
| 6 | Nigeria | Abuja | 196 | Africa | 1 |
| 6 | Nigeria | Abuja | 196 | Africa | 2 |
| 1 | USA | Washington | 328 | America | 1 |
| 1 | USA | Washington | 328 | America | 2 |
| 5 | Indonesia | Jakarta | 267 | Asia | 1 |
| 5 | Indonesia | Jakarta | 267 | Asia | 2 |
| 3 | Spain | Madrid | 47 | Europe | 1 |
| 3 | Spain | Madrid | 47 | Europe | 2 |
| 2 | France | Paris | 67 | Europe | 3 |
| 2 | France | Paris | 67 | Europe | 4 |
| 4 | Russia | Moscow | 145 | Europe | 5 |
| 4 | Russia | Moscow | 145 | Europe | 6 |
+-------+------------+-------------+---------------+--------------+-----+
12 rows selected (31.298 seconds)
select * from (select c.*, row_number () over (partition by continent order by population desc) as rn from default.countries_3 c) as c where c.rn=1;
OK
+-------+------------+-------------+---------------+--------------+-------+
| c.id | c.name | c.capital | c.population | c.continent | c.rn |
+-------+------------+-------------+---------------+--------------+-------+
| 6 | Nigeria | Abuja | 196 | Africa | 1 |
| 1 | USA | Washington | 328 | America | 1 |
| 5 | Indonesia | Jakarta | 267 | Asia | 1 |
| 4 | Russia | Moscow | 145 | Europe | 1 |
+-------+------------+-------------+---------------+--------------+-------+
4 rows selected (31.107 seconds)
select * from (select c.*, row_number () over (partition by id) as rn from default.countries_3 c) as c where c.rn=1;
OK
+-------+------------+-------------+---------------+--------------+-------+
| c.id | c.name | c.capital | c.population | c.continent | c.rn |
+-------+------------+-------------+---------------+--------------+-------+
| 1 | USA | Washington | 328 | America | 1 |
| 2 | France | Paris | 67 | Europe | 1 |
| 3 | Spain | Madrid | 47 | Europe | 1 |
| 4 | Russia | Moscow | 145 | Europe | 1 |
| 5 | Indonesia | Jakarta | 267 | Asia | 1 |
| 6 | Nigeria | Abuja | 196 | Africa | 1 |
+-------+------------+-------------+---------------+--------------+-------+
6 rows selected (34.8 seconds)
id,name,trans_amount,trans_dt
1,ram,220,2022-03-29 08:09:04
1,ram,320,2022-03-29 07:09:04
1,ram,420,2022-03-29 06:09:04
2,sita,210,2022-03-29 07:09:04
2,sita,310,2022-03-29 06:09:04
2,sita,410,2022-03-29 05:09:04
1,ram,520,2022-03-28 07:09:04
1,ram,620,2022-03-28 06:09:04
2,sita,510,2022-03-28 07:09:04
2,sita,610,2022-03-28 06:09:04
1,ram,720,2022-03-27 05:09:04
2,sita,710,2022-03-27 05:09:04
CREATE EXTERNAL TABLE IF NOT EXISTS transactions (ID int, name string, trans_amount decimal(10,2), trans_dt timestamp) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION '/huser/data/transactions' TBLPROPERTIES ("skip.header.line.count"="1");
# or
ALTER TABLE transactions SET TBLPROPERTIES ("skip.header.line.count"="1");
OK
+------------------+--------------------+----------------------------+------------------------+
| transactions.id | transactions.name | transactions.trans_amount | transactions.trans_dt |
+------------------+--------------------+----------------------------+------------------------+
| 1 | ram | 220.00 | 2022-03-29 08:09:04.0 |
| 1 | ram | 320.00 | 2022-03-29 07:09:04.0 |
| 1 | ram | 420.00 | 2022-03-29 06:09:04.0 |
| 2 | sita | 210.00 | 2022-03-29 07:09:04.0 |
| 2 | sita | 310.00 | 2022-03-29 06:09:04.0 |
| 2 | sita | 410.00 | 2022-03-29 05:09:04.0 |
| 1 | ram | 520.00 | 2022-03-28 07:09:04.0 |
| 1 | ram | 620.00 | 2022-03-28 06:09:04.0 |
| 2 | sita | 510.00 | 2022-03-28 07:09:04.0 |
| 2 | sita | 610.00 | 2022-03-28 06:09:04.0 |
| 1 | ram | 720.00 | 2022-03-27 05:09:04.0 |
| 2 | sita | 710.00 | 2022-03-27 05:09:04.0 |
+------------------+--------------------+----------------------------+------------------------+
12 rows selected (0.325 seconds)
select * from (select c.*, row_number () over (partition by id, cast(trans_dt as date) order by trans_dt desc) as rn from default.transactions c) as c where c.rn=1 order by trans_dt;
OK
+-------+---------+-----------------+------------------------+-------+
| c.id | c.name | c.trans_amount | c.trans_dt | c.rn |
+-------+---------+-----------------+------------------------+-------+
| 2 | sita | 710.00 | 2022-03-27 05:09:04.0 | 1 |
| 1 | ram | 720.00 | 2022-03-27 05:09:04.0 | 1 |
| 2 | sita | 510.00 | 2022-03-28 07:09:04.0 | 1 |
| 1 | ram | 520.00 | 2022-03-28 07:09:04.0 | 1 |
| 2 | sita | 210.00 | 2022-03-29 07:09:04.0 | 1 |
| 1 | ram | 220.00 | 2022-03-29 08:09:04.0 | 1 |
+-------+---------+-----------------+------------------------+-------+
6 rows selected (70.429 seconds)
select * from (select c.*, row_number () over (partition by id, cast(trans_dt as date) order by trans_amount desc) as rn from default.transactions c) as c where c.rn=1 order by trans_dt;
OK
+-------+---------+-----------------+------------------------+-------+
| c.id | c.name | c.trans_amount | c.trans_dt | c.rn |
+-------+---------+-----------------+------------------------+-------+
| 2 | sita | 710.00 | 2022-03-27 05:09:04.0 | 1 |
| 1 | ram | 720.00 | 2022-03-27 05:09:04.0 | 1 |
| 2 | sita | 610.00 | 2022-03-28 06:09:04.0 | 1 |
| 1 | ram | 620.00 | 2022-03-28 06:09:04.0 | 1 |
| 2 | sita | 410.00 | 2022-03-29 05:09:04.0 | 1 |
| 1 | ram | 420.00 | 2022-03-29 06:09:04.0 | 1 |
+-------+---------+-----------------+------------------------+-------+
6 rows selected (66.352 seconds)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment