Skip to content

Instantly share code, notes, and snippets.

@rohitdholakia
Last active February 1, 2016 23:08
Show Gist options
  • Save rohitdholakia/a5f32f644859e09b6f52 to your computer and use it in GitHub Desktop.
Save rohitdholakia/a5f32f644859e09b6f52 to your computer and use it in GitHub Desktop.
FileSinkOperator documentation

What is a sink operator?

Hive runs a SQL query as a DAG of jobs or, before Hive 2.0, as MapReduce jobs as well. The final result of this job or set of jobs is always stored in HDFS. Why? Well, your cluster might have to be rebooted between the end of the job and sending of this output to client, or something else might go wrong. To protect against these, we write the final output to HDFS and when Hive gets a request for this output (typically over thrift), it sends the file across.

To write output to HDFS, a FileSinkOperator instance is used. FileSinkOperator is a TerminalOperator, meaning that it is called at the end of a job. If there are multiple tasks running, each of them will have their own instance of FileSink (FSO). Whenever you first take a look at an operator, look at 3 functions:

  • initializeOp()
  • process()
  • closeOp()

These 3 functions give you a fair amount of idea of what this operator does. In the case of FSO, during initialization, it finds out if the table is native or not, there is dynamic partitioning or not, what kind of serDe is being used, etc. In this post, we will focus on the SerDe component.

SerDe

Hive works on top of Hadoop. Hadoop started as being able to run MapReduce jobs. When you start writing MR jobs, you would see that a Map job reads records and emits key-value pairs. For e.g, it can read a line and emit <line number, all words upper cased>. This is sent to a reduce script which runs it's own functions to filter this out in a certain way. E.g, it can see these sentences and only choose those which have the word DIMSUM in them.

This is great! You have map, you have reduce and something makes them work together and handle failures and distributed nature and everything in between. But, Hive thinks in terms of rows and columns!

Consider a query of the form:

select table1.col1, table1.col2 from table1 inner join table2 where table1.col3 = table2.col3 

Hive will run an inner join on this set of 2 tables as a MR job. But, someone has to turn these rows to key-value pairs and then turn those key-value pairs back to rows and columns.

A SerDe does that for you. A SerDe typically has 3 methods:

  • initialize()
  • serialize()
  • deSerialize()

In the initialize() method, a SerDe would get several schema details. E.g, list of columns, their types, delimiter, serialization_format, etc.

serialize() takes a row as input and returns a Writable. What's a Writable? Hadoop uses writable to be able to serialize/deserialize it. You can return a Writable and any rowWriter will be able to serialize it to a stream or a SequenceFile. Carrying that thought forward, you can have your own custom type (say, a byte), which implements Writable and define how you would readFields and write it.

Back to our SerDe, so, we understood that initialize() uses schema info to find out how many columns there are, what are the types, and what is the field delimiter etc. This is important because once SerDe's serialize() method is called, it should be able to use the information from initialize() to convert this row to a Writable.

Note #1: SerDes should extend AbstractSerDe. Initially, an interface SerDe was used,but now, you should extend AbstractSerDe. Also, use the .serde2. package.

FileSinkOperator.process(Object row, int tag)

Think about the query again. We are going row by row. The select operator will basically forward the row down the tree. We come to the predicate. Let's say we are filtering an employee table where age>50. As we see a row which matches the criteria, we want to serialize it. This is when the process() function gets called. When you start reading the code of the process() function, you would see ObjectInspector at several places. What is that and how do we use that?

#ObjectInspector

Remember how in the serialize() method, the SerDe needs to take a look at the row object it has, go through each column and find out how Hive has stored the data? Hive can choose to store as Integer as a LazyInt or an Integer or an IntWritable. Similarly, you can store doubles as LazyDouble or LazyBinaryDouble or DoubleWritable. Hive also has complex types, list, map and struct. An object will have it's data and it will have an objectInspector to allow Hive to inspect that object.

public interface ObjectInspector extends Cloneable {
  /**
   * Category.
   *
   */
  public static enum Category {
    PRIMITIVE, LIST, MAP, STRUCT, UNION
  };

So, we see that ObjectInspector is an interface which fits all object types into 5 categories as pointed above. But, how did we reach here? Oh yeah, SerDes and ObjectInspectors.

#SerDe serialize()

A row has fields, fields have objects, objects have data in them. A SerDe converts that row to a Writable. It needs to know what the data is in those objects! It uses an ObjectInspector for each of those fields. This reasoning is why, when you see any SerDe's initialize() method, it does the following:

  • Find out how many columns there are
  • What are their types?
  • Based on those types, get the Object Inspectors for them.
  • Use them in the serialize() method to get the data and serialize it.

deserialize()

The inverse of serialize() happens in deserialize(). We are given a serialized object and we convert it to a Hive object. Most custom SerDes focus on this method because this is called when data is being read of off HDFS or another data storage format. When you are reading the data, you want it to be in your chosen format.

Okay, so how did we come to FSO and SerDes?

Yes, what happens now in Hive 2.0 and previous releases is that you run a join query, or a group by query, or any other query with condition(s) and it stores the final data (a resultSet) on HDFS. After this, the client calls for this data over thrift fetchResults and this data is sent to the FetchOperator. This is done by using the deserialize method of the chosen SerDe and converting the serialized objects to rows. But, the output is a resultset right? So, the FetchOperator knows that and consolidates those rows to a ResultSet. But, it doesn't end there! Remember that HiveServer2 is talking to the client over thrift, so HS2 cannot just send the ResultSet as-is. It has to be converted to the corresponding thrift object. That is done. And only after that, the data is sent back to the client.

Did you spot the redundancy in this approach? When we finish the query and are going to write the final ResultSet to HDFS, we knew that the client would ask for it. If there was a way we could have converted that ResultSet to a thrift type and written that to HDFS, wouldn't it be great?! Imagine the alternative. This is what we discussed above:

  • serialize() the rows to HDFS.
  • deserialize() them.
  • FetchOperator would consolidate the rows into a ResultSet object.
  • This would get converted to a Thrift resultSet object.
  • It would be sent to the client.

To fix this, work started on an umbrella JIRA HIVE-12427. As part of the work, we will:

  • Create a new module that will have all the thrift-generated code, HIVE-12442. This was done because with the change of creating thrift objects in SerDe, the SerDe would have a dependency on the service module (for thrift gen code). But, that was leading to a circular dependency. Also, it is much more cleaner to have a separate module for the thrift gen code. Hence, this was done first.

Fun story: This JIRA was delayed because after creating a new pom.xml, updating dependencies of hive-service and hive-jdbc, and updating all the import statements in them, it was compiling fine. But, when starting hive --service metastore, it would complain about being unable to find $HIVE_HOME/2/0/0/SNAPSHOT/jar. There are two things wrong here. First is, of course, being unable to start. But, more importantly, the path being printed, the path where it was looking for the JAR, is all wrong!

at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:274)
at org.apache.hadoop.util.RunJar.run(RunJar.java:214)
at org.apache.hadoop.util.RunJar.main(RunJar.java:136)

We looked at class loaders and maven output in debug mode (this is a LOT of output) but there wasn't anything wrong there. On a closer look at the stack trace, we see that the origin of the problem is in the RunJar class. This is a hadoop class that loads JARs. Hive uses them to load it's own JAR files. Ah ha! But, where is this being called?

Turns out, it is part of bin/ext/metastore.sh and bin/ext/hiveserver2.sh. In metastore.sh, it was adding the JAR file

JAR=${HIVE_LIB}/hive-service-*.jar

Now, this is wrong because it is not required but it was working earlier because there was only one hive-service. Now, with hive-service and hive-service-rpc, it was picking up two JAR files! And it was failing. The same problem was in hiveserver2.sh. So, we finally fixed it by updating the regex to

JAR=${HIVE_LIB}/hive-service-[0-9].*.jar

And that is how we finished HIVE-12442.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment