Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Getting Tez enabled on CDH5.4+

So Hive in CDH is horribly, painfully slow. Cloudera ships Hive 1.1, which is actually moderately modern. It is, however, very badly configured out of the box and patched with custom code from Cloudera. With a bit of effort, we managed to improve hive performance considerably. We really shouldn't have to do this, but Cloudera is actively working against supporting a performant Hive.

First, building Tez was fairly straightforward. Using the instructions at https://github.com/apache/tez/blob/master/docs/src/site/markdown/install.md, the only change was to use the version string "2.6.0" for the build. I believe that was the default. Don't use the CDH string, it won't work.

At the bottom of the installation instructions, there's mention of the fact that to use the local hadoop jars (rather than those packaged with tez) you must unpack the jars in HDFS rather than using the tarball. In this case, unpack the tez-minimal tarball and upload the contents to /apps/tez-0.7.0 (or whatever you prefer). Don't forget to include the lib/ directory as well.

Unpack the tez-minimal tarball built in tez-dist/target onto the machine with your hiveserver2 instance. Create a tez-site.xml and place it in the same directory. Here's my tez-site.xml. The first two properties are all you need to actually get things running - but I should call out that I have NOT enabled the Yarn Timeline Service. If you want to get the Tez UI working, you'll need to do this. Let me know how you do and we can update this guide.

<!-- tez-site.xml -->
<configuration>
  <property>
    <name>tez.lib.uris</name>
    <value>${fs.defaultFS}/apps/tez-0.7.0/,${fs.defaultFS}/apps/tez-0.7.0/lib</value>
  </property>

  <property>
    <name>tez.use.cluster.hadoop-libs</name>
    <value>true</value>
  </property>
  
  <property>
    <name>tez.staging-dir</name>
    <value>/tmp/${user.name}/staging</value>
    <description>The staging dir used while submitting DAGs</description>
  </property>
  <property>
    <name>tez.am.resource.memory.mb</name>
    <value>2048</value>
    <description>The amount of memory to be used by the AppMaster</description>
  </property>
  <property>
    <name>tez.am.java.opts</name>
    <value>-server -Xmx1800m -Djava.net.preferIPv4Stack=true -XX:+UseNUMA -XX:+UseParallelGC</value>
    <description>Java options for the Tez AppMaster process. The -Xmx parameter value is generally 0.8 times tez.am.resource.memory.mb config.</description>
  </property>
  <property>
    <name>tez.am.shuffle-vertex-manager.min-src-fraction</name>
    <value>0.2</value>
    <description>In case of a ScatterGather connection, the fraction of source tasks which should
      complete before tasks for the current vertex are schedule
    </description>
  </property>
  <property>
    <name>tez.am.shuffle-vertex-manager.max-src-fraction</name>
    <value>0.4</value>
    <description>In case of a ScatterGather connection, once this fraction of source tasks have
      completed, all tasks on the current vertex can be scheduled. Number of tasks ready for
      scheduling on the current vertex scales linearly between min-fraction and max-fraction
    </description>
  </property>
  <property>
    <name>tez.am.am-rm.heartbeat.interval-ms.max</name>
    <value>250</value>
    <description>The maximum heartbeat interval between the AM and RM in milliseconds</description>
  </property>
  <property>
    <name>tez.am.grouping.split-waves</name>
    <value>1.4</value>
    <description>The multiplier for available queue capacity when determining number of tasks for
      a Vertex. 1.4 with 100% queue available implies generating a number of tasks roughly equal
      to 140% of the available containers on the queue
    </description>
  </property>
  <property>
    <name>tez.am.grouping.min-size</name>
    <value>16777216</value>
    <description>Lower bound on the size (in bytes) of a grouped split, to avoid generating
      too many splits
    </description>
  </property>
  <property>
    <name>tez.am.grouping.max-size</name>
    <value>1073741824</value>
    <description>Upper bound on the size (in bytes) of a grouped split, to avoid generating
      excessively large split
    </description>
  </property>
  <property>
    <name>tez.am.container.reuse.enabled</name>
    <value>true</value>
    <description>Configuration to specify whether container should be reused</description>
  </property>
  <property>
    <name>tez.am.container.reuse.rack-fallback.enabled</name>
    <value>true</value>
    <description>Whether to reuse containers for rack local tasks. Active only if reuse is enabled
    </description>
  </property>
  <property>
    <name>tez.am.container.reuse.non-local-fallback.enabled</name>
    <value>true</value>
    <description>Whether to reuse containers for non-local tasks. Active only if reuse is enabled
    </description>
  </property>
  <property>
    <name>tez.am.container.session.delay-allocation-millis</name>
    <value>10000</value>
    <!-- TODO This value may change -->
    <description>The amount of time to hold on to a container if no task can be assigned to
      it immediately. Only active when reuse is enabled. Set to -1 to never release a container
      in a session
    </description>
  </property>
  <property>
    <name>tez.am.container.reuse.locality.delay-allocation-millis</name>
    <value>250</value>
    <description>The amount of time to wait before assigning a container to the next level of
      locality. NODE -> RACK -> NON_LOCAL
    </description>
  </property>
  <property>
    <name>tez.task.get-task.sleep.interval-ms.max</name>
    <value>200</value>
    <description>The maximum amount of time, in seconds, to wait before a task asks an AM for
      another task
    </description>
  </property>
  <property>
  </property>
  <!-- Client Submission timeout value when submitting DAGs to a session -->
  <property>
    <name>tez.session.client.timeout.secs</name>
    <value>180</value>
    <description>Time (in seconds) to wait for AM to come up when trying to submit a DAG from
      the client
    </description>
  </property>
  <property>
    <name>tez.session.am.dag.submit.timeout.secs</name>
    <value>300</value>
    <description>Time (in seconds) for which the Tez AM should wait for a DAG to be submitted
      before shutting down
    </description>
  </property>
  <!-- Configuration for runtime components -->
  <!-- These properties can be set on a per edge basis by configuring the payload for each edge independently. -->
  <property>
    <name>tez.runtime.intermediate-output.should-compress</name>
    <value>false</value>
    <description>Whether intermediate output should be compressed or not</description>
  </property>
  <property>
    <name>tez.runtime.intermediate-output.compress.codec</name>
    <value>org.apache.hadoop.io.compress.SnappyCodec</value>
    <description>The coded to be used if compressing intermediate output. Only
      applicable if tez.runtime.intermediate-output.should-compress is enabled.
    </description>
  </property>
  <property>
    <name>tez.runtime.intermediate-input.is-compressed</name>
    <value>false</value>
    <description>Whether intermediate input is compressed</description>
  </property>
  <property>
    <name>tez.runtime.intermediate-input.compress.codec</name>
    <value>org.apache.hadoop.io.compress.SnappyCodec</value>
    <description>The coded to be used when reading intermediate compressed input.
    Only applicable if tez.runtime.intermediate-input.is-compressed is enabled.</description>
  </property>
  <!-- Configuration for ATS integration -->

</configuration>

Next, we need to add one Jar to satisfy Cloudera's hacked up version of HiveServer. Upload the hadoop-mapreduce-client-common jar from Cloudera's parcel directory. Should look like this: /usr/cloudera/parcels/CDH-5.4.1-1.cdh5.4.1.p0.6/jars/hadoop-mapreduce-client-common-2.6.0-cdh5.4.1.jar -- or something like that depending on your install. Place it in the HDFS directory along with your tez jars in /apps/tez-0.7.0 or whatever.

Now we need to focus on our hive configurations. This is one area that we can get some improvements without tez in the process - Cloudera's configuration by default leaves off a lot of features.

First, we've got to add the tez jars to the path for hiveserver2. This assumes you're using Cloudera Manager, but it's easy to take these instructions and add them to manual configurations. In the "HiveServer2 Environment Advanced Configuration Snippet (Safety Valve)" add two configuration variables:

---- HiveServer2 Safety Valve ----
TEZ_CONF_DIR=/usr/lib/tez/
HADOOP_CLASSPATH=/usr/lib/tez/*:/usr/lib/tez/lib/*:/usr/lib/tez
---- END Hiveserver2 Safety Valve ----

Note there are three considerations - one with the tez/* and tez/lib/* wildcards for the jars, as well as one without the wildcard in order to locate the tez-site.xml.

Next, we're into hive-site.xml. I've added a LOT of stuff to my hive-site.xml, many of which are unnessecary for tez specifically, and many of which are just set to the default, but I'll share all of what I placed in the hive-site.xml safety valve.

Note that at the top I've disabled tez by default. Users can access tez through using set hive.execution.engine=tez; before their query. While everything in here could be tuned and tweeked, I added these primarily for reference rather than for actual behavior changes. A few exceptions -- parallel execution, sessions, resource management and java opts are all tweaked here. Join behaviors in Hive 1.1 are not the most intelligent yet (they improve significantly in Hive 1.2), so for some queries I've had to turn off MapJoin optimizations. We're currently doing this on a case-by-case basis, rather than turning off MapJoin for everything. If you hit heap space errors on your queries, try set hive.auto.convert.join=false;.

After this, restart your hiveserver2 and give it a shot by manually turning on tez before your query.

Enjoy, and please submit any feedback and corrections! -Aaron

<!-- hiveserver2 hive-site.xml safety valve -->
<property>
   <name>hive.execution.engine</name>
   <value>mr</value>
</property>

<property>
    <name>hive.exec.scratchdir</name>
    <value>/tmp/hive</value>
</property>
<property>
    <name>hive.exec.local.scratchdir</name>
    <value>/tmp/hive</value>
  </property>
  <property>
    <name>hive.downloaded.resources.dir</name>
    <value>/tmp/${hive.session.id}_resources</value>
  </property>
  <property>
    <name>hive.scratch.dir.permission</name>
    <value>700</value>
  </property>
  <property>
    <name>hive.exec.submitviachild</name>
    <value>false</value>
  </property>
  <property>
    <name>hive.exec.submit.local.task.via.child</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.ignore.mapjoin.hint</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.map.aggr</name>
    <value>true</value>
    <description>Whether to use map-side aggregation in Hive Group By queries</description>
  </property>
  <property>
    <name>hive.groupby.skewindata</name>
    <value>true</value>
    <description>Whether there is skew in data to optimize group by queries</description>
  </property>
  <property>
    <name>hive.join.emit.interval</name>
    <value>1000</value>
    <description>How many rows in the right-most join operand Hive should buffer before emitting the join result.</description>
  </property>
  <property>
    <name>hive.join.cache.size</name>
    <value>25000</value>
    <description>How many rows in the joining tables (except the streaming table) should be cached in memory.</description>
  </property>
  <property>
    <name>hive.cbo.enable</name>
    <value>true</value>
    <description>Flag to control enabling Cost Based Optimizations using Calcite framework.</description>
  </property>
  <property>
    <name>hive.mapjoin.bucket.cache.size</name>
    <value>100</value>
    <description/>
  </property>
  <property>
    <name>hive.mapjoin.optimized.hashtable</name>
    <value>true</value>
    <description>
      Whether Hive should use memory-optimized hash table for MapJoin. Only works on Tez,
      because memory-optimized hashtable cannot be serialized.
    </description>
  </property>
  <property>
    <name>hive.mapjoin.optimized.hashtable.wbsize</name>
    <value>10485760</value>
    <description>
      Optimized hashtable (see hive.mapjoin.optimized.hashtable) uses a chain of buffers to
      store data. This is one buffer size. HT may be slightly faster if this is larger, but for small
      joins unnecessary memory will be allocated and then trimmed.
    </description>
  </property>
  <property>
    <name>hive.smbjoin.cache.rows</name>
    <value>10000</value>
    <description>How many rows with the same key value should be cached in memory per smb joined table.</description>
  </property>
  <property>
    <name>hive.groupby.mapaggr.checkinterval</name>
    <value>100000</value>
    <description>Number of rows after which size of the grouping keys/aggregation classes is performed</description>
  </property>
  <property>
    <name>hive.map.aggr.hash.percentmemory</name>
    <value>0.5</value>
    <description>Portion of total memory to be used by map-side group aggregation hash table</description>
  </property>
  <property>
    <name>hive.mapjoin.followby.map.aggr.hash.percentmemory</name>
    <value>0.3</value>
    <description>Portion of total memory to be used by map-side group aggregation hash table, when this group by is followed by map join</description>
  </property>
  <property>
    <name>hive.map.aggr.hash.force.flush.memory.threshold</name>
    <value>0.9</value>
    <description>
      The max memory to be used by map-side group aggregation hash table.
      If the memory usage is higher than this number, force to flush data
    </description>
  </property>
  <property>
    <name>hive.map.aggr.hash.min.reduction</name>
    <value>0.5</value>
    <description>
      Hash aggregation will be turned off if the ratio between hash  table size and input rows is bigger than this number. 
      Set to 1 to make sure hash aggregation is never turned off.
    </description>
  </property>

  <property>
    <name>hive.multigroupby.singlereducer</name>
    <value>true</value>
    <description>
      Whether to optimize multi group by query to generate single M/R  job plan. If the multi group by query has 
      common group by keys, it will be optimized to generate single M/R job.
    </description>
  </property>
  <property>
    <name>hive.map.groupby.sorted</name>
    <value>true</value>
    <description>
      If the bucketing/sorting properties of the table exactly match the grouping key, whether to perform 
      the group by in the mapper by using BucketizedHiveInputFormat. The only downside to this
      is that it limits the number of mappers to the number of files.
    </description>
  </property>
  <property>
    <name>hive.map.groupby.sorted.testmode</name>
    <value>false</value>
    <description>
      If the bucketing/sorting properties of the table exactly match the grouping key, whether to perform 
      the group by in the mapper by using BucketizedHiveInputFormat. If the test mode is set, the plan
      is not converted, but a query property is set to denote the same.
    </description>
  </property>
  <property>
    <name>hive.groupby.orderby.position.alias</name>
    <value>true</value>
    <description>Whether to enable using Column Position Alias in Group By or Order By</description>
  </property>
  <property>
    <name>hive.new.job.grouping.set.cardinality</name>
    <value>30</value>
    <description>
      Whether a new map-reduce job should be launched for grouping sets/rollups/cubes.
      For a query like: select a, b, c, count(1) from T group by a, b, c with rollup;
      4 rows are created per row: (a, b, c), (a, b, null), (a, null, null), (null, null, null).
      This can lead to explosion across map-reduce boundary if the cardinality of T is very high,
      and map-side aggregation does not do a very good job. 
      
      This parameter decides if Hive should add an additional map-reduce job. If the grouping set
      cardinality (4 in the example above), is more than this value, a new MR job is added under the
      assumption that the original group by will reduce the data size.
    </description>
  </property>
  <property>
    <name>hive.exec.copyfile.maxsize</name>
    <value>33554432</value>
    <description>Maximum file size (in Mb) that Hive uses to do single HDFS copies between directories.Distributed copies (distcp) will be used instead for bigger files so that copies can be done faster.</description>
  </property>

  <property>
    <name>hive.default.fileformat</name>
    <value>TextFile</value>
    <description>
      Expects one of [textfile, sequencefile, rcfile, orc].
      Default file format for CREATE TABLE statement. Users can explicitly override it by CREATE TABLE ... STORED AS [FORMAT]
    </description>
  </property>

  <property>
    <name>hive.exec.orc.memory.pool</name>
    <value>0.5</value>
    <description>Maximum fraction of heap that can be used by ORC file writers</description>
  </property>
  <property>
    <name>hive.exec.orc.write.format</name>
    <value/>
    <description>
      Define the version of the file to write. Possible values are 0.11 and 0.12.
      If this parameter is not defined, ORC will use the run length encoding (RLE)
      introduced in Hive 0.12. Any value other than 0.11 results in the 0.12 encoding.
    </description>
  </property>
  <property>
    <name>hive.exec.orc.default.stripe.size</name>
    <value>67108864</value>
    <description>Define the default ORC stripe size, in bytes.</description>
  </property>
  <property>
    <name>hive.exec.orc.default.block.size</name>
    <value>268435456</value>
    <description>Define the default file system block size for ORC files.</description>
  </property>
  <property>
    <name>hive.exec.orc.dictionary.key.size.threshold</name>
    <value>0.8</value>
    <description>
      If the number of keys in a dictionary is greater than this fraction of the total number of
      non-null rows, turn off dictionary encoding.  Use 1 to always use dictionary encoding.
    </description>
  </property>
  <property>
    <name>hive.exec.orc.default.row.index.stride</name>
    <value>10000</value>
    <description>
      Define the default ORC index stride in number of rows. (Stride is the number of rows
      an index entry represents.)
    </description>
  </property>
  <property>
    <name>hive.orc.row.index.stride.dictionary.check</name>
    <value>true</value>
    <description>
      If enabled dictionary check will happen after first row index stride (default 10000 rows)
      else dictionary check will happen before writing first stripe. In both cases, the decision
      to use dictionary or not will be retained thereafter.
    </description>
  </property>
  <property>
    <name>hive.exec.orc.default.buffer.size</name>
    <value>262144</value>
    <description>Define the default ORC buffer size, in bytes.</description>
  </property>
  <property>
    <name>hive.exec.orc.default.block.padding</name>
    <value>true</value>
    <description>Define the default block padding, which pads stripes to the HDFS block boundaries.</description>
  </property>
  <property>
    <name>hive.exec.orc.block.padding.tolerance</name>
    <value>0.05</value>
    <description>
      Define the tolerance for block padding as a decimal fraction of stripe size (for
      example, the default value 0.05 is 5% of the stripe size). For the defaults of 64Mb
      ORC stripe and 256Mb HDFS blocks, the default block padding tolerance of 5% will
      reserve a maximum of 3.2Mb for padding within the 256Mb block. In that case, if the
      available size within the block is more than 3.2Mb, a new smaller stripe will be
      inserted to fit within that space. This will make sure that no stripe written will
      cross block boundaries and cause remote reads within a node local task.
    </description>
  </property>
  <property>
    <name>hive.exec.orc.default.compress</name>
    <value>ZLIB</value>
    <description>Define the default compression codec for ORC file</description>
  </property>
  <property>
    <name>hive.exec.orc.encoding.strategy</name>
    <value>SPEED</value>
    <description>
      Expects one of [speed, compression].
      Define the encoding strategy to use while writing data. Changing this will
      only affect the light weight encoding for integers. This flag will not
      change the compression level of higher level compression codec (like ZLIB).
    </description>
  </property>
  <property>
    <name>hive.exec.orc.compression.strategy</name>
    <value>SPEED</value>
    <description>
      Expects one of [speed, compression].
      Define the compression strategy to use while writing data. 
      This changes the compression level of higher level compression codec (like ZLIB).
    </description>
  </property>
  <property>
    <name>hive.orc.splits.include.file.footer</name>
    <value>false</value>
    <description>
      If turned on splits generated by orc will include metadata about the stripes in the file. This
      data is read remotely (from the client or HS2 machine) and sent to all the tasks.
    </description>
  </property>
  <property>
    <name>hive.orc.cache.stripe.details.size</name>
    <value>10000</value>
    <description>Cache size for keeping meta info about orc splits cached in the client.</description>
  </property>
  <property>
    <name>hive.orc.compute.splits.num.threads</name>
    <value>10</value>
    <description>How many threads orc should use to create splits in parallel.</description>
  </property>
  <property>
    <name>hive.exec.orc.skip.corrupt.data</name>
    <value>false</value>
    <description>
      If ORC reader encounters corrupt data, this value will be used to determine
      whether to skip the corrupt data or throw exception. The default behavior is to throw exception.
    </description>
  </property>
  <property>
    <name>hive.exec.orc.zerocopy</name>
    <value>true</value>
    <description>Use zerocopy reads with ORC. (This requires Hadoop 2.3 or later.)</description>
  </property>

  <property>
    <name>hive.optimize.skewjoin</name>
    <value>true</value>
    <description>
      Whether to enable skew join optimization. 
      The algorithm is as follows: At runtime, detect the keys with a large skew. Instead of
      processing those keys, store them temporarily in an HDFS directory. In a follow-up map-reduce
      job, process those skewed keys. The same key need not be skewed for all the tables, and so,
      the follow-up map-reduce job (for the skewed keys) would be much faster, since it would be a
      map-join.
    </description>
  </property>
  <property>
    <name>hive.auto.convert.join</name>
    <value>true</value>
    <description>Whether Hive enables the optimization about converting common join into mapjoin based on the input file size</description>
  </property>
  <property>
    <name>hive.auto.convert.join.noconditionaltask</name>
    <value>true</value>
    <description>
      Whether Hive enables the optimization about converting common join into mapjoin based on the input file size. 
      If this parameter is on, and the sum of size for n-1 of the tables/partitions for a n-way join is smaller than the
      specified size, the join is directly converted to a mapjoin (there is no conditional task).
    </description>
  </property>
  <property>
    <name>hive.auto.convert.join.noconditionaltask.size</name>
    <value>536870912</value>
    <description>
      If hive.auto.convert.join.noconditionaltask is off, this parameter does not take affect. 
      However, if it is on, and the sum of size for n-1 of the tables/partitions for a n-way join is smaller than this size, 
      the join is directly converted to a mapjoin(there is no conditional task). The default is 10MB
    </description>
  </property>
  <property>
    <name>hive.auto.convert.join.use.nonstaged</name>
    <value>false</value>
    <description>
      For conditional joins, if input stream from a small alias can be directly applied to join operator without 
      filtering or projection, the alias need not to be pre-staged in distributed cache via mapred local task.
      Currently, this is not working with vectorization or tez execution engine.
    </description>
  </property>
  <property>
    <name>hive.skewjoin.key</name>
    <value>100000</value>
    <description>
      Determine if we get a skew key in join. If we see more than the specified number of rows with the same key in join operator,
      we think the key as a skew join key. 
    </description>
  </property>
  <property>
    <name>hive.skewjoin.mapjoin.map.tasks</name>
    <value>10000</value>
    <description>
      Determine the number of map task used in the follow up map join job for a skew join.
      It should be used together with hive.skewjoin.mapjoin.min.split to perform a fine grained control.
    </description>
  </property>
  <property>
    <name>hive.skewjoin.mapjoin.min.split</name>
    <value>33554432</value>
    <description>
      Determine the number of map task at most used in the follow up map join job for a skew join by specifying 
      the minimum split size. It should be used together with hive.skewjoin.mapjoin.map.tasks to perform a fine grained control.
    </description>
  </property>
  <property>
    <name>hive.hashtable.key.count.adjustment</name>
    <value>1.0</value>
    <description>Adjustment to mapjoin hashtable size derived from table and column statistics; the estimate of the number of keys is divided by this value. If the value is 0, statistics are not usedand hive.hashtable.initialCapacity is used instead.</description>
  </property>
  <property>
    <name>hive.hashtable.initialCapacity</name>
    <value>100000</value>
    <description>Initial capacity of mapjoin hashtable if statistics are absent, or if hive.hashtable.stats.key.estimate.adjustment is set to 0</description>
  </property>
  <property>
    <name>hive.hashtable.loadfactor</name>
    <value>0.75</value>
    <description/>
  </property>
  <property>
    <name>hive.mapjoin.followby.gby.localtask.max.memory.usage</name>
    <value>0.55</value>
    <description>
      This number means how much memory the local task can take to hold the key/value into an in-memory hash table 
      when this map join is followed by a group by. If the local task's memory usage is more than this number, 
      the local task will abort by itself. It means the data of the small table is too large to be held in memory.
    </description>
  </property>
  <property>
    <name>hive.mapjoin.localtask.max.memory.usage</name>
    <value>0.9</value>
    <description>
      This number means how much memory the local task can take to hold the key/value into an in-memory hash table. 
      If the local task's memory usage is more than this number, the local task will abort by itself. 
      It means the data of the small table is too large to be held in memory.
    </description>
  </property>
  <property>
    <name>hive.mapjoin.check.memory.rows</name>
    <value>100000</value>
    <description>The number means after how many rows processed it needs to check the memory usage</description>
  </property>

  <property>
    <name>hive.auto.convert.sortmerge.join</name>
    <value>true</value>
    <description>Will the join be automatically converted to a sort-merge join, if the joined tables pass the criteria for sort-merge join.</description>
  </property>
  <property>
    <name>hive.auto.convert.sortmerge.join.bigtable.selection.policy</name>
    <value>org.apache.hadoop.hive.ql.optimizer.AvgPartitionSizeBasedBigTableSelectorForAutoSMJ</value>
    <description>
      The policy to choose the big table for automatic conversion to sort-merge join. 
      By default, the table with the largest partitions is assigned the big table. All policies are:
      . based on position of the table - the leftmost table is selected
      org.apache.hadoop.hive.ql.optimizer.LeftmostBigTableSMJ.
      . based on total size (all the partitions selected in the query) of the table 
      org.apache.hadoop.hive.ql.optimizer.TableSizeBasedBigTableSelectorForAutoSMJ.
      . based on average size (all the partitions selected in the query) of the table 
      org.apache.hadoop.hive.ql.optimizer.AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.
      New policies can be added in future.
    </description>
  </property>
  <property>
    <name>hive.auto.convert.sortmerge.join.to.mapjoin</name>
    <value>true</value>
    <description>
      If hive.auto.convert.sortmerge.join is set to true, and a join was converted to a sort-merge join, 
      this parameter decides whether each table should be tried as a big table, and effectively a map-join should be
      tried. That would create a conditional task with n+1 children for a n-way join (1 child for each table as the
      big table), and the backup task will be the sort-merge join. In some cases, a map-join would be faster than a
      sort-merge join, if there is no advantage of having the output bucketed and sorted. For example, if a very big sorted
      and bucketed table with few files (say 10 files) are being joined with a very small sorter and bucketed table
      with few files (10 files), the sort-merge join will only use 10 mappers, and a simple map-only join might be faster
      if the complete small table can fit in memory, and a map-join can be performed.
    </description>
  </property>

  <property>
    <name>hive.server2.thrift.min.worker.threads</name>
    <value>5</value>
    <description>Minimum number of Thrift worker threads</description>
  </property>
  <property>
    <name>hive.server2.thrift.max.worker.threads</name>
    <value>500</value>
    <description>Maximum number of Thrift worker threads</description>
  </property>
  <property>
    <name>hive.server2.thrift.exponential.backoff.slot.length</name>
    <value>100ms</value>
    <description>
      Expects a time value with unit (d/day, h/hour, m/min, s/sec, ms/msec, us/usec, ns/nsec), which is msec if not specified.
      Binary exponential backoff slot time for Thrift clients during login to HiveServer2,
      for retries until hitting Thrift client timeout
    </description>
  </property>
  <property>
    <name>hive.server2.thrift.login.timeout</name>
    <value>20s</value>
    <description>
      Expects a time value with unit (d/day, h/hour, m/min, s/sec, ms/msec, us/usec, ns/nsec), which is sec if not specified.
      Timeout for Thrift clients during login to HiveServer2
    </description>
  </property>
  <property>
    <name>hive.server2.thrift.worker.keepalive.time</name>
    <value>60s</value>
    <description>
      Expects a time value with unit (d/day, h/hour, m/min, s/sec, ms/msec, us/usec, ns/nsec), which is sec if not specified.
      Keepalive time (in seconds) for an idle worker thread. When the number of workers exceeds min workers, excessive threads are killed after this time interval.
    </description>
  </property>
  <property>
    <name>hive.server2.async.exec.threads</name>
    <value>100</value>
    <description>Number of threads in the async thread pool for HiveServer2</description>
  </property>
  <property>
    <name>hive.server2.async.exec.shutdown.timeout</name>
    <value>10s</value>
    <description>
      Expects a time value with unit (d/day, h/hour, m/min, s/sec, ms/msec, us/usec, ns/nsec), which is sec if not specified.
      How long HiveServer2 shutdown will wait for async threads to terminate.
    </description>
  </property>
  <property>
    <name>hive.server2.async.exec.wait.queue.size</name>
    <value>100</value>
    <description>
      Size of the wait queue for async thread pool in HiveServer2.
      After hitting this limit, the async thread pool will reject new requests.
    </description>
  </property>
  <property>
    <name>hive.server2.async.exec.keepalive.time</name>
    <value>10s</value>
    <description>
      Expects a time value with unit (d/day, h/hour, m/min, s/sec, ms/msec, us/usec, ns/nsec), which is sec if not specified.
      Time that an idle HiveServer2 async thread (from the thread pool) will wait for a new task
      to arrive before terminating
    </description>
  </property>
  <property>
    <name>hive.server2.long.polling.timeout</name>
    <value>5000ms</value>
    <description>
      Expects a time value with unit (d/day, h/hour, m/min, s/sec, ms/msec, us/usec, ns/nsec), which is msec if not specified.
      Time that HiveServer2 will wait before responding to asynchronous calls that use long polling
    </description>
  </property>
  <property>
    <name>hive.server2.authentication</name>
    <value>NONE</value>
    <description>
      Expects one of [nosasl, none, ldap, kerberos, pam, custom].
      Client authentication types.
        NONE: no authentication check
        LDAP: LDAP/AD based authentication
        KERBEROS: Kerberos/GSSAPI authentication
        CUSTOM: Custom authentication provider
                (Use with property hive.server2.custom.authentication.class)
        PAM: Pluggable authentication module
        NOSASL:  Raw transport
    </description>
  </property>
  <property>
    <name>hive.server2.allow.user.substitution</name>
    <value>true</value>
    <description>Allow alternate user to be specified as part of HiveServer2 open connection request.</description>
  </property>

  <property>
    <name>hive.user.install.directory</name>
    <value>hdfs:///user/</value>
    <description>
      If hive (in tez mode only) cannot find a usable hive jar in "hive.jar.directory", 
      it will upload the hive jar to "hive.user.install.directory/user.name"
      and use it to run queries.
    </description>
</property>

  <property>
    <name>hive.vectorized.execution.enabled</name>
    <value>true</value>
    <description>
      This flag should be set to true to enable vectorized mode of query execution.
      The default value is false.
    </description>
  </property>
  <property>
    <name>hive.vectorized.execution.reduce.enabled</name>
    <value>true</value>
    <description>
      This flag should be set to true to enable vectorized mode of the reduce-side of query execution.
      The default value is true.
    </description>
  </property>
  <property>
    <name>hive.vectorized.execution.reduce.groupby.enabled</name>
    <value>true</value>
    <description>
      This flag should be set to true to enable vectorized mode of the reduce-side GROUP BY query execution.
      The default value is true.
    </description>
  </property>
  <property>
    <name>hive.vectorized.groupby.checkinterval</name>
    <value>100000</value>
    <description>Number of entries added to the group by aggregation hash before a recomputation of average entry size is performed.</description>
  </property>
  <property>
    <name>hive.vectorized.groupby.maxentries</name>
    <value>1000000</value>
    <description>
      Max number of entries in the vector group by aggregation hashtables. 
      Exceeding this will trigger a flush irrelevant of memory pressure condition.
    </description>
  </property>
  <property>
    <name>hive.vectorized.groupby.flush.percent</name>
    <value>0.1</value>
    <description>Percent of entries in the group by aggregation hash flushed when the memory threshold is exceeded.</description>
  </property>

  <property>
    <name>hive.rpc.query.plan</name>
    <value>true</value>
    <description>Whether to send the query plan via local resource or RPC</description>
  </property>
  <property>
    <name>hive.compute.splits.in.am</name>
    <value>true</value>
    <description>Whether to generate the splits locally or in the AM (tez only)</description>
  </property>
  <property>
    <name>hive.prewarm.enabled</name>
    <value>false</value>
    <description>Enables container prewarm for Tez (Hadoop 2 only)</description>
  </property>
  <property>
    <name>hive.prewarm.numcontainers</name>
    <value>10</value>
    <description>Controls the number of containers to prewarm for Tez (Hadoop 2 only)</description>
  </property>


  <property>
    <name>hive.exec.parallel</name>
    <value>true</value>
    <description>Whether to execute jobs in parallel</description>
  </property>
  <property>
    <name>hive.exec.parallel.thread.number</name>
    <value>16</value>
    <description>How many jobs at most can be executed in parallel</description>
  </property>
  <property>
    <name>hive.mapred.reduce.tasks.speculative.execution</name>
    <value>true</value>
    <description>Whether speculative execution for reducers should be turned on. </description>
  </property>
  <property>
    <name>hive.exec.counters.pull.interval</name>
    <value>1000</value>
    <description>
      The interval with which to poll the JobTracker for the counters the running job. 
      The smaller it is the more load there will be on the jobtracker, the higher it is the less granular the caught will be.
    </description>
  </property>
  <property>
    <name>hive.exec.dynamic.partition</name>
    <value>true</value>
    <description>Whether or not to allow dynamic partitions in DML/DDL.</description>
  </property>
  <property>
    <name>hive.exec.dynamic.partition.mode</name>
    <value>strict</value>
    <description>
      In strict mode, the user must specify at least one static partition
      in case the user accidentally overwrites all partitions.
      In nonstrict mode all partitions are allowed to be dynamic.
    </description>
  </property>

  <property>
    <name>hive.localize.resource.wait.interval</name>
    <value>5000ms</value>
    <description>
      Expects a time value with unit (d/day, h/hour, m/min, s/sec, ms/msec, us/usec, ns/nsec), which is msec if not specified.
      Time to wait for another thread to localize the same resource for hive-tez.
    </description>
  </property>
  <property>
    <name>hive.localize.resource.num.wait.attempts</name>
    <value>5</value>
    <description>The number of attempts waiting for localizing a resource in hive-tez.</description>
  </property>
  
<property>
    <name>hive.merge.tezfiles</name>
    <value>false</value>
    <description>Merge small files at the end of a Tez DAG</description>
  </property>
  <property>
    <name>hive.tez.input.format</name>
    <value>org.apache.hadoop.hive.ql.io.HiveInputFormat</value>
    <description>The default input format for tez. Tez groups splits in the AM.</description>
  </property>
  <property>
    <name>hive.tez.container.size</name>
    <value>4096</value>
  </property>
  <property>
    <name>hive.tez.cpu.vcores</name>
    <value>-1</value>
  </property>
  <property>
    <name>hive.tez.java.opts</name>
    <value>-Xmx3766m -XX:NewRatio=8 -XX:+UseNUMA -XX:+UseParallelGC</value>
  </property>
  <property>
    <name>hive.tez.log.level</name>
    <value>INFO</value>
  </property>
  <property>
    <name>hive.server2.tez.default.queues</name>
    <value/>
  </property>
  <property>
    <name>hive.server2.tez.sessions.per.default.queue</name>
    <value>1</value>
  </property>
  <property>
    <name>hive.server2.tez.initialize.default.sessions</name>
    <value>false</value>
  </property>
  <property>
    <name>hive.convert.join.bucket.mapjoin.tez</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.tez.auto.reducer.parallelism</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.tez.max.partition.factor</name>
    <value>2.0</value>
    <description>When auto reducer parallelism is enabled this factor will be used to over-partition data in shuffle edges.</description>
  </property>
  <property>
    <name>hive.tez.min.partition.factor</name>
    <value>0.25</value>
  </property>
  <property>
    <name>hive.tez.dynamic.partition.pruning</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.tez.dynamic.partition.pruning.max.event.size</name>
    <value>1048576</value>
    <description>Maximum size of events sent by processors in dynamic pruning. If this size is crossed no pruning will take place.</description>
  </property>
  <property>
    <name>hive.tez.dynamic.partition.pruning.max.data.size</name>
    <value>104857600</value>
    <description>Maximum total data size of events in dynamic pruning.</description>
  </property>
  <property>
    <name>hive.tez.smb.number.waves</name>
    <value>0.5</value>
    <description>The number of waves in which to run the SMB join. Account for cluster being occupied. Ideally should be 1 wave.</description>
  </property>
  <property>
   <name>hive.tez.exec.print.summary</name>
    <value>false</value>
    <description>Display breakdown of execution steps, for every query executed by the shell.</description>
  </property>
  <property>
    <name>hive.tez.exec.inplace.progress</name>
    <value>true</value>
    <description>Updates tez job execution progress in-place in the terminal.</description>
  </property>

@alphalzh

This comment has been minimized.

Copy link

alphalzh commented Mar 16, 2016

Thanks a lot. This saved me from the endless headache of configuring tez on CDH. Also searched for days without a proper solution until I saw this. Great work!

@mawolf

This comment has been minimized.

Copy link

mawolf commented Oct 1, 2016

I must admit that this is also my impression with Cloudera, and it worries me quite a bit. Thanks for your solution that gave some performance improvements, however, we are still far from the performance we have seen on HortonWorks.

@hadoopsters

This comment has been minimized.

Copy link

hadoopsters commented Apr 25, 2018

I have gotten so very close but no luck. 😢 Keep getting this error:

Caused by: java.lang.NoSuchMethodError: org.apache.hadoop.mapred.TaskID: method (Ljava/lang/String;ILorg/apache/hadoop/mapreduce/TaskType;I)V not found

That shows up in the application logs in the syslog section. I've tried to remedy this by moving cloudera hadoop jars into tez directory with no real luck. Any thoughts are welcome.

Here's what I do to run my query:
`[lrobinson@hddev001 lib]$ beeline -u jdbc:hive2://myclusterhostname:10000/market -n lrobinson -p r0bin$0n
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.7.6-1.cdh5.7.6.p0.6/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/tez-0.8.4/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.7.6-1.cdh5.7.6.p0.6/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/tez-0.8.4/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.7.6-1.cdh5.7.6.p0.6/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/tez-0.8.4/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
scan complete in 4ms
Connecting to jdbc:hive2://myclusterhostname:10000/market
Connected to: Apache Hive (version 1.1.0-cdh5.7.6)
Driver: Hive JDBC (version 1.1.0-cdh5.7.6)
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 1.1.0-cdh5.7.6 by Apache Hive
0: jdbc:hive2://myclusterhostname:10000/m> set hive.execution.engine=tez;
No rows affected (0.069 seconds)
0: jdbc:hive2://myclusterhostname:10000/m> select count() from impression;
DEBUG : Acquire a monitor for compiling query
INFO : Compiling command(queryId=cloudera-scm_20180425222828_08010429-8407-4fd2-aa32-1c54769ad7fe): select count(
) from impression
DEBUG : Encoding valid txns info 9223372036854775807:
INFO : Semantic Analysis Completed
INFO : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:_c0, type:bigint, comment:null)], properties:null)
INFO : Completed compiling command(queryId=cloudera-scm_20180425222828_08010429-8407-4fd2-aa32-1c54769ad7fe); Time taken: 0.203 seconds
INFO : Executing command(queryId=cloudera-scm_20180425222828_08010429-8407-4fd2-aa32-1c54769ad7fe): select count(*) from impression
INFO : Query ID = cloudera-scm_20180425222828_08010429-8407-4fd2-aa32-1c54769ad7fe
INFO : Total jobs = 1
INFO : Launching Job 1 out of 1
INFO : Starting task [Stage-1:MAPRED] in serial mode
INFO : Tez session hasn't been created yet. Opening session
DEBUG : Adding local resource: scheme: "hdfs" host: "nameservice1" port: -1 file: "/tmp/hive/lrobinson/_tez_session_dir/956785af-5e98-4c77-a25f-46ed4c27014e/hive-exec-1.1.0-cdh5.7.6-core.jar"
DEBUG : Adding local resource: scheme: "hdfs" host: "nameservice1" port: -1 file: "/tmp/hive/lrobinson/_tez_session_dir/956785af-5e98-4c77-a25f-46ed4c27014e/mysql-connector-java.jar"
INFO :

INFO : Status: Running (Executing on YARN cluster with App id application_1524174574848_0069)

INFO : Map 1: -/- Reducer 2: 0/1
INFO : Map 1: 0/214 Reducer 2: 0/1
INFO : Map 1: 0/214 Reducer 2: 0/1
INFO : Map 1: 0(+1)/214 Reducer 2: 0/1
INFO : Map 1: 0(+6)/214 Reducer 2: 0/1
INFO : Map 1: 0(+16)/214 Reducer 2: 0/1
INFO : Map 1: 0(+28)/214 Reducer 2: 0/1
INFO : Map 1: 0(+40,-2)/214 Reducer 2: 0/1
INFO : Map 1: 0(+51,-4)/214 Reducer 2: 0/1
INFO : Map 1: 0(+55,-8)/214 Reducer 2: 0/1
INFO : Map 1: 0(+62,-12)/214 Reducer 2: 0/1
INFO : Map 1: 0(+62,-15)/214 Reducer 2: 0/1
INFO : Map 1: 0(+64,-18)/214 Reducer 2: 0/1
INFO : Map 1: 0(+64,-18)/214 Reducer 2: 0/1
INFO : Map 1: 0(+30,-27)/214 Reducer 2: 0/1
INFO : Map 1: 0(+32,-110)/214 Reducer 2: 0/1
ERROR : Status: Failed
ERROR : Session stats:submittedDAGs=0, successfulDAGs=0, failedDAGs=0, killedDAGs=0

ERROR : FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.tez.TezTask
DEBUG : Shutting down query select count() from impression
INFO : Completed executing command(queryId=cloudera-scm_20180425222828_08010429-8407-4fd2-aa32-1c54769ad7fe); Time taken: 23.842 seconds
DEBUG : Shutting down query select count(
) from impression
Error: Error while processing statement: FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.tez.TezTask (state=08S01,code=2)`

@elonlo

This comment has been minimized.

Copy link

elonlo commented Jun 8, 2018

thks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.