Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save wesleychowadroll/d3b0ee1e85c445379a32 to your computer and use it in GitHub Desktop.
Save wesleychowadroll/d3b0ee1e85c445379a32 to your computer and use it in GitHub Desktop.
MultiTableHFileOutputFormat
package core.hadoop.output;
/**
* Copyright 2014 AdRoll
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* HFileOutputFormat that can partition across multiple tables.
* It assumes the mapper output key is prefixed by the table,
* and that the map output key class is ImmutableBytesWritable (same as HFileOutputForamt)
*
*/
public class MultiTableHFileOutputFormat extends HFileOutputFormat {
private static final Logger LOG = LoggerFactory.getLogger(MultiTableHFileOutputFormat.class);
public static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression";
public static final String BLOOM_TYPE_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype";
public static final String DATABLOCK_ENCODING_CONF_KEY = "hbase.mapreduce.hfileoutputformat.datablock.encoding";
// -- This section contains our utility functions to map <table>:<row> to/from bytes --
/**
* Parses an ImmutableBytesWritable into a String array of two elements.
* The first element is the table and the second is the row.
*
* HBase table names cannot contain ":"
*
* @param bytes
* @return String array containing table and row
*/
protected static String[] getTableAndRow(ImmutableBytesWritable bytes) {
String keyWithPrefix = Bytes.toString(bytes.get());
int index = keyWithPrefix.indexOf(":");
if (index == -1) {
LOG.error("Output key has an invalid format {}", keyWithPrefix);
throw new IllegalArgumentException("RecordWriter key cannot be split into a table and row");
} else {
String[] tableAndRow = new String[2];
tableAndRow[0] = keyWithPrefix.substring(0, index);
tableAndRow[1] = keyWithPrefix.substring(index+1);
return tableAndRow;
}
}
public static String getTable(ImmutableBytesWritable bytes) {
String[] tableAndRow = getTableAndRow(bytes);
return tableAndRow[0];
}
public static String getRow(ImmutableBytesWritable bytes) {
String[] tableAndRow = getTableAndRow(bytes);
return tableAndRow[1];
}
/**
* Concatenates the input table and row together,
* using a colon as the separator
*
* @param table
* @param row
* @return <table>:<row> as bytes
*/
public static byte[] makeKey(String table, String row) {
return Bytes.toBytes(table + ":" + row);
}
// -- This section contains the slightly modified functions copied from HFileOutputFormat --
/**
* This function does the same setup as
* {@link HFileOutputFormat#configureIncrementalLoad(Job, HTable)},
* except it sets up the total order partitioner to expect "<table>:<row>"
* instead of just <row>.
*
* @param job
* @param tables
* @throws IOException
*/
public static void configureIncrementalLoad(Job job, HTable... tables) throws IOException {
Configuration conf = job.getConfiguration();
Class<? extends Partitioner> topClass;
try {
topClass = getTotalOrderPartitionerClass();
} catch (ClassNotFoundException e) {
throw new IOException("Failed getting TotalOrderPartitioner", e);
}
job.setPartitionerClass(topClass);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(KeyValue.class);
job.setOutputFormatClass(MultiTableHFileOutputFormat.class);
// Based on the configured map output class, set the correct reducer to properly
// sort the incoming values.
// TODO it would be nice to pick one or the other of these formats.
if (KeyValue.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(KeyValueSortReducer.class);
} else if (Put.class.equals(job.getMapOutputValueClass())) {
job.setReducerClass(PutSortReducer.class);
} else {
LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass());
LOG.warn("Make sure your reducer emits (ImmutableBytesWritable, KeyValue) in sorted order");
}
/** ADROLL CODE HERE */
// We need to get the region start keys for each table
// and generate the sequence file accordingly.
// writePartitions sorts the start keys later on, so we needn't bother
int reduceTasks = 0;
List<ImmutableBytesWritable> allStartKeys = new ArrayList<ImmutableBytesWritable>();
for (HTable table : tables) {
// First get the current region splits
LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName()));
List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table);
allStartKeys.addAll(startKeys);
LOG.info("Configuring " + startKeys.size() + " reduce partitions " +
"to match current region count");
reduceTasks += startKeys.size();
// Set compression algorithms based on column families
configureCompression(table, conf);
configureBloomType(table, conf);
}
job.setNumReduceTasks(reduceTasks);
/** END ADROLL CODE */
// Then setup the sequence files for the split start points
Path partitionsPath = new Path(job.getWorkingDirectory(),
"partitions_" + UUID.randomUUID());
LOG.info("Writing partition information to " + partitionsPath);
FileSystem fs = partitionsPath.getFileSystem(conf);
writePartitions(conf, partitionsPath, allStartKeys);
partitionsPath.makeQualified(fs);
URI cacheUri;
try {
// Below we make explicit reference to the bundled TOP. Its cheating.
// We are assume the define in the hbase bundled TOP is as it is in
// hadoop (whether 0.20 or 0.22, etc.)
cacheUri = new URI(partitionsPath.toString() + "#" +
org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner.DEFAULT_PATH);
} catch (URISyntaxException e) {
throw new IOException(e);
}
DistributedCache.addCacheFile(cacheUri, conf);
DistributedCache.createSymlink(conf);
// We don't need to copy dependency jars since we make an uber jar
// TableMapReduceUtil.addDependencyJars(job);
LOG.info("Incremental table output configured.");
}
/**
* Return the start keys of all of the regions in this table,
* with the "<table>: " prepended, as a list of ImmutableBytesWritable.
*/
protected static List<ImmutableBytesWritable> getRegionStartKeys(HTable table)
throws IOException {
byte[] keyPrefix = Bytes.toBytes(Bytes.toString(table.getTableName()) + ":");
byte[][] byteKeys = table.getStartKeys();
ArrayList<ImmutableBytesWritable> ret =
new ArrayList<ImmutableBytesWritable>(byteKeys.length);
for (byte[] byteKey : byteKeys) {
// Build <table>:<row>
byte[] keyWithPrefix = new byte[keyPrefix.length + byteKey.length];
System.arraycopy(keyPrefix, 0, keyWithPrefix, 0, keyPrefix.length);
System.arraycopy(byteKey, 0, keyWithPrefix, keyPrefix.length, byteKey.length);
ret.add(new ImmutableBytesWritable(keyWithPrefix));
}
return ret;
}
/**
* We need to look at the context.write key and parse out the table,
* then write the KeyValue to the right file.
*/
@Override
public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context)
throws IOException, InterruptedException {
// Get the path of the temporary output file
final Path outputPath = FileOutputFormat.getOutputPath(context);
final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath();
final Configuration conf = context.getConfiguration();
final FileSystem fs = outputdir.getFileSystem(conf);
// These configs. are from hbase-*.xml
final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
HConstants.DEFAULT_MAX_FILE_SIZE);
final int blocksize = conf.getInt("hbase.mapreduce.hfileoutputformat.blocksize",
HFile.DEFAULT_BLOCKSIZE);
// Invented config. Add to hbase-*.xml if other than default compression.
final String defaultCompression = conf.get("hfile.compression",
Compression.Algorithm.NONE.getName());
final boolean compactionExclude = conf.getBoolean(
"hbase.mapreduce.hfileoutputformat.compaction.exclude", false);
// create a map from column family to the compression algorithm
final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf);
final Map<byte[], String> bloomTypeMap = createFamilyBloomMap(conf);
String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_CONF_KEY);
final HFileDataBlockEncoder encoder;
if (dataBlockEncodingStr == null) {
encoder = NoOpDataBlockEncoder.INSTANCE;
} else {
try {
encoder = new HFileDataBlockEncoderImpl(DataBlockEncoding
.valueOf(dataBlockEncodingStr));
} catch (IllegalArgumentException ex) {
throw new RuntimeException(
"Invalid data block encoding type configured for the param "
+ DATABLOCK_ENCODING_CONF_KEY + " : "
+ dataBlockEncodingStr);
}
}
return new RecordWriter<ImmutableBytesWritable, KeyValue>() {
// Map of families to writers and how much has been output on the writer.
private final Map<byte [], WriterLength> writers =
new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR);
private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY;
private final byte [] now = Bytes.toBytes(System.currentTimeMillis());
private boolean rollRequested = false;
public void write(ImmutableBytesWritable row, KeyValue kv)
throws IOException {
// null input == user explicitly wants to flush
if (row == null && kv == null) {
rollWriters();
return;
}
/** ADROLL CODE HERE */
// We need to extract the table and row from the input,
// since the hadoop code assumes its just the row at this point
String[] tableAndKey = getTableAndRow(row);
String tableName = tableAndKey[0];
byte[] rowKey = Bytes.toBytes(tableAndKey[1]);
long length = kv.getLength();
byte [] family = kv.getFamily();
WriterLength wl = this.writers.get(family);
// If this is a new column family, verify that the directory exists
if (wl == null) {
Path tableOutputDir = new Path(outputdir, tableName);
fs.mkdirs(new Path(tableOutputDir, Bytes.toString(family)));
}
/** END ADROLL CODE */
// If any of the HFiles for the column families has reached
// maxsize, we need to roll all the writers
if (wl != null && wl.written + length >= maxsize) {
this.rollRequested = true;
}
// This can only happen once a row is finished though
if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) {
rollWriters();
}
// create a new HLog writer, if necessary
if (wl == null || wl.writer == null) {
/** ADROLL CODE HERE */
// We need to pass the table in addition to the family to write to the proper path
wl = getNewWriter(tableName, family, conf);
/** END ADROLL CODE */
}
// we now have the proper HLog writer. full steam ahead
kv.updateLatestStamp(this.now);
wl.writer.append(kv);
wl.written += length;
// Copy the row so we know when a row transition.
this.previousRow = rowKey;
}
private void rollWriters() throws IOException {
for (WriterLength wl : this.writers.values()) {
if (wl.writer != null) {
LOG.info("Writer=" + wl.writer.getPath() +
((wl.written == 0)? "": ", wrote=" + wl.written));
close(wl.writer);
}
wl.writer = null;
wl.written = 0;
}
this.rollRequested = false;
}
/* Create a new StoreFile.Writer.
* @param family
* @return A WriterLength, containing a new StoreFile.Writer.
* @throws IOException
*/
private WriterLength getNewWriter(String tableName, byte[] family, Configuration conf)
throws IOException {
WriterLength wl = new WriterLength();
/** ADROLL CODE HERE */
// We need to demux the output per table into separate directories
Path tableOutputDir = new Path(outputdir, tableName);
Path familydir = new Path(tableOutputDir, Bytes.toString(family));
/** END ADROLL CODE */
String compression = compressionMap.get(family);
compression = compression == null ? defaultCompression : compression;
String bloomTypeStr = bloomTypeMap.get(family);
BloomType bloomType = BloomType.NONE;
if (bloomTypeStr != null) {
bloomType = BloomType.valueOf(bloomTypeStr);
}
Configuration tempConf = new Configuration(conf);
tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);
wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs, blocksize)
.withOutputDir(familydir)
.withCompression(AbstractHFileWriter.compressionByName(compression))
.withBloomType(bloomType)
.withComparator(KeyValue.COMPARATOR)
.withDataBlockEncoder(encoder)
.withChecksumType(Store.getChecksumType(conf))
.withBytesPerChecksum(Store.getBytesPerChecksum(conf))
.build();
this.writers.put(family, wl);
return wl;
}
private void close(final StoreFile.Writer w) throws IOException {
if (w != null) {
w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
Bytes.toBytes(System.currentTimeMillis()));
w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY,
Bytes.toBytes(context.getTaskAttemptID().toString()));
w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY,
Bytes.toBytes(true));
w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY,
Bytes.toBytes(compactionExclude));
w.appendTrackedTimestampsToMetadata();
w.close();
}
}
public void close(TaskAttemptContext c)
throws IOException, InterruptedException {
for (WriterLength wl: this.writers.values()) {
close(wl.writer);
}
}
};
}
/**
* Write out a SequenceFile that can be read by TotalOrderPartitioner
* that contains the split points in startKeys.
* @param partitionsPath output path for SequenceFile
* @param startKeys the region start keys
*/
protected static void writePartitions(Configuration conf, Path partitionsPath,
List<ImmutableBytesWritable> startKeys) throws IOException {
if (startKeys.isEmpty()) {
throw new IllegalArgumentException("No regions passed");
}
// We're generating a list of split points, and we don't ever
// have keys < the first region (which has an empty start key)
// so we need to remove it. Otherwise we would end up with an
// empty reducer with index 0
TreeSet<ImmutableBytesWritable> sorted =
new TreeSet<ImmutableBytesWritable>(startKeys);
/** ADROLL CODE HERE */
// Since we prefix the row with the table,
// we need to check if row portion is empty instead of the whole thing.
ImmutableBytesWritable first = sorted.first();
String rowKey = getRow(first);
if (!rowKey.isEmpty()) {
throw new IllegalArgumentException(
"First region of table should have empty start key. Instead has: "
+ Bytes.toStringBinary(first.get()));
}
/** END ADROLL CODE */
sorted.remove(first);
// Write the actual file
FileSystem fs = partitionsPath.getFileSystem(conf);
SequenceFile.Writer writer = SequenceFile.createWriter(fs,
conf, partitionsPath, ImmutableBytesWritable.class, NullWritable.class);
try {
for (ImmutableBytesWritable startKey : sorted) {
writer.append(startKey, NullWritable.get());
}
} finally {
writer.close();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment