Created
June 16, 2014 22:52
-
-
Save wesleychowadroll/d3b0ee1e85c445379a32 to your computer and use it in GitHub Desktop.
MultiTableHFileOutputFormat
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package core.hadoop.output; | |
/** | |
* Copyright 2014 AdRoll | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
* | |
*/ | |
import java.io.IOException; | |
import java.net.URI; | |
import java.net.URISyntaxException; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.TreeMap; | |
import java.util.TreeSet; | |
import java.util.UUID; | |
import org.apache.hadoop.conf.Configuration; | |
import org.apache.hadoop.filecache.DistributedCache; | |
import org.apache.hadoop.fs.FileSystem; | |
import org.apache.hadoop.fs.Path; | |
import org.apache.hadoop.hbase.HConstants; | |
import org.apache.hadoop.hbase.KeyValue; | |
import org.apache.hadoop.hbase.client.HTable; | |
import org.apache.hadoop.hbase.client.Put; | |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; | |
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding; | |
import org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter; | |
import org.apache.hadoop.hbase.io.hfile.CacheConfig; | |
import org.apache.hadoop.hbase.io.hfile.Compression; | |
import org.apache.hadoop.hbase.io.hfile.HFile; | |
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; | |
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl; | |
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; | |
import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer; | |
import org.apache.hadoop.hbase.mapreduce.PutSortReducer; | |
import org.apache.hadoop.hbase.regionserver.Store; | |
import org.apache.hadoop.hbase.regionserver.StoreFile; | |
import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType; | |
import org.apache.hadoop.hbase.util.Bytes; | |
import org.apache.hadoop.io.NullWritable; | |
import org.apache.hadoop.io.SequenceFile; | |
import org.apache.hadoop.mapreduce.Job; | |
import org.apache.hadoop.mapreduce.Partitioner; | |
import org.apache.hadoop.mapreduce.RecordWriter; | |
import org.apache.hadoop.mapreduce.TaskAttemptContext; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; | |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
/** | |
* HFileOutputFormat that can partition across multiple tables. | |
* It assumes the mapper output key is prefixed by the table, | |
* and that the map output key class is ImmutableBytesWritable (same as HFileOutputForamt) | |
* | |
*/ | |
public class MultiTableHFileOutputFormat extends HFileOutputFormat { | |
private static final Logger LOG = LoggerFactory.getLogger(MultiTableHFileOutputFormat.class); | |
public static final String COMPRESSION_CONF_KEY = "hbase.hfileoutputformat.families.compression"; | |
public static final String BLOOM_TYPE_CONF_KEY = "hbase.hfileoutputformat.families.bloomtype"; | |
public static final String DATABLOCK_ENCODING_CONF_KEY = "hbase.mapreduce.hfileoutputformat.datablock.encoding"; | |
// -- This section contains our utility functions to map <table>:<row> to/from bytes -- | |
/** | |
* Parses an ImmutableBytesWritable into a String array of two elements. | |
* The first element is the table and the second is the row. | |
* | |
* HBase table names cannot contain ":" | |
* | |
* @param bytes | |
* @return String array containing table and row | |
*/ | |
protected static String[] getTableAndRow(ImmutableBytesWritable bytes) { | |
String keyWithPrefix = Bytes.toString(bytes.get()); | |
int index = keyWithPrefix.indexOf(":"); | |
if (index == -1) { | |
LOG.error("Output key has an invalid format {}", keyWithPrefix); | |
throw new IllegalArgumentException("RecordWriter key cannot be split into a table and row"); | |
} else { | |
String[] tableAndRow = new String[2]; | |
tableAndRow[0] = keyWithPrefix.substring(0, index); | |
tableAndRow[1] = keyWithPrefix.substring(index+1); | |
return tableAndRow; | |
} | |
} | |
public static String getTable(ImmutableBytesWritable bytes) { | |
String[] tableAndRow = getTableAndRow(bytes); | |
return tableAndRow[0]; | |
} | |
public static String getRow(ImmutableBytesWritable bytes) { | |
String[] tableAndRow = getTableAndRow(bytes); | |
return tableAndRow[1]; | |
} | |
/** | |
* Concatenates the input table and row together, | |
* using a colon as the separator | |
* | |
* @param table | |
* @param row | |
* @return <table>:<row> as bytes | |
*/ | |
public static byte[] makeKey(String table, String row) { | |
return Bytes.toBytes(table + ":" + row); | |
} | |
// -- This section contains the slightly modified functions copied from HFileOutputFormat -- | |
/** | |
* This function does the same setup as | |
* {@link HFileOutputFormat#configureIncrementalLoad(Job, HTable)}, | |
* except it sets up the total order partitioner to expect "<table>:<row>" | |
* instead of just <row>. | |
* | |
* @param job | |
* @param tables | |
* @throws IOException | |
*/ | |
public static void configureIncrementalLoad(Job job, HTable... tables) throws IOException { | |
Configuration conf = job.getConfiguration(); | |
Class<? extends Partitioner> topClass; | |
try { | |
topClass = getTotalOrderPartitionerClass(); | |
} catch (ClassNotFoundException e) { | |
throw new IOException("Failed getting TotalOrderPartitioner", e); | |
} | |
job.setPartitionerClass(topClass); | |
job.setOutputKeyClass(ImmutableBytesWritable.class); | |
job.setOutputValueClass(KeyValue.class); | |
job.setOutputFormatClass(MultiTableHFileOutputFormat.class); | |
// Based on the configured map output class, set the correct reducer to properly | |
// sort the incoming values. | |
// TODO it would be nice to pick one or the other of these formats. | |
if (KeyValue.class.equals(job.getMapOutputValueClass())) { | |
job.setReducerClass(KeyValueSortReducer.class); | |
} else if (Put.class.equals(job.getMapOutputValueClass())) { | |
job.setReducerClass(PutSortReducer.class); | |
} else { | |
LOG.warn("Unknown map output value type:" + job.getMapOutputValueClass()); | |
LOG.warn("Make sure your reducer emits (ImmutableBytesWritable, KeyValue) in sorted order"); | |
} | |
/** ADROLL CODE HERE */ | |
// We need to get the region start keys for each table | |
// and generate the sequence file accordingly. | |
// writePartitions sorts the start keys later on, so we needn't bother | |
int reduceTasks = 0; | |
List<ImmutableBytesWritable> allStartKeys = new ArrayList<ImmutableBytesWritable>(); | |
for (HTable table : tables) { | |
// First get the current region splits | |
LOG.info("Looking up current regions for table " + Bytes.toString(table.getTableName())); | |
List<ImmutableBytesWritable> startKeys = getRegionStartKeys(table); | |
allStartKeys.addAll(startKeys); | |
LOG.info("Configuring " + startKeys.size() + " reduce partitions " + | |
"to match current region count"); | |
reduceTasks += startKeys.size(); | |
// Set compression algorithms based on column families | |
configureCompression(table, conf); | |
configureBloomType(table, conf); | |
} | |
job.setNumReduceTasks(reduceTasks); | |
/** END ADROLL CODE */ | |
// Then setup the sequence files for the split start points | |
Path partitionsPath = new Path(job.getWorkingDirectory(), | |
"partitions_" + UUID.randomUUID()); | |
LOG.info("Writing partition information to " + partitionsPath); | |
FileSystem fs = partitionsPath.getFileSystem(conf); | |
writePartitions(conf, partitionsPath, allStartKeys); | |
partitionsPath.makeQualified(fs); | |
URI cacheUri; | |
try { | |
// Below we make explicit reference to the bundled TOP. Its cheating. | |
// We are assume the define in the hbase bundled TOP is as it is in | |
// hadoop (whether 0.20 or 0.22, etc.) | |
cacheUri = new URI(partitionsPath.toString() + "#" + | |
org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner.DEFAULT_PATH); | |
} catch (URISyntaxException e) { | |
throw new IOException(e); | |
} | |
DistributedCache.addCacheFile(cacheUri, conf); | |
DistributedCache.createSymlink(conf); | |
// We don't need to copy dependency jars since we make an uber jar | |
// TableMapReduceUtil.addDependencyJars(job); | |
LOG.info("Incremental table output configured."); | |
} | |
/** | |
* Return the start keys of all of the regions in this table, | |
* with the "<table>: " prepended, as a list of ImmutableBytesWritable. | |
*/ | |
protected static List<ImmutableBytesWritable> getRegionStartKeys(HTable table) | |
throws IOException { | |
byte[] keyPrefix = Bytes.toBytes(Bytes.toString(table.getTableName()) + ":"); | |
byte[][] byteKeys = table.getStartKeys(); | |
ArrayList<ImmutableBytesWritable> ret = | |
new ArrayList<ImmutableBytesWritable>(byteKeys.length); | |
for (byte[] byteKey : byteKeys) { | |
// Build <table>:<row> | |
byte[] keyWithPrefix = new byte[keyPrefix.length + byteKey.length]; | |
System.arraycopy(keyPrefix, 0, keyWithPrefix, 0, keyPrefix.length); | |
System.arraycopy(byteKey, 0, keyWithPrefix, keyPrefix.length, byteKey.length); | |
ret.add(new ImmutableBytesWritable(keyWithPrefix)); | |
} | |
return ret; | |
} | |
/** | |
* We need to look at the context.write key and parse out the table, | |
* then write the KeyValue to the right file. | |
*/ | |
@Override | |
public RecordWriter<ImmutableBytesWritable, KeyValue> getRecordWriter(final TaskAttemptContext context) | |
throws IOException, InterruptedException { | |
// Get the path of the temporary output file | |
final Path outputPath = FileOutputFormat.getOutputPath(context); | |
final Path outputdir = new FileOutputCommitter(outputPath, context).getWorkPath(); | |
final Configuration conf = context.getConfiguration(); | |
final FileSystem fs = outputdir.getFileSystem(conf); | |
// These configs. are from hbase-*.xml | |
final long maxsize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, | |
HConstants.DEFAULT_MAX_FILE_SIZE); | |
final int blocksize = conf.getInt("hbase.mapreduce.hfileoutputformat.blocksize", | |
HFile.DEFAULT_BLOCKSIZE); | |
// Invented config. Add to hbase-*.xml if other than default compression. | |
final String defaultCompression = conf.get("hfile.compression", | |
Compression.Algorithm.NONE.getName()); | |
final boolean compactionExclude = conf.getBoolean( | |
"hbase.mapreduce.hfileoutputformat.compaction.exclude", false); | |
// create a map from column family to the compression algorithm | |
final Map<byte[], String> compressionMap = createFamilyCompressionMap(conf); | |
final Map<byte[], String> bloomTypeMap = createFamilyBloomMap(conf); | |
String dataBlockEncodingStr = conf.get(DATABLOCK_ENCODING_CONF_KEY); | |
final HFileDataBlockEncoder encoder; | |
if (dataBlockEncodingStr == null) { | |
encoder = NoOpDataBlockEncoder.INSTANCE; | |
} else { | |
try { | |
encoder = new HFileDataBlockEncoderImpl(DataBlockEncoding | |
.valueOf(dataBlockEncodingStr)); | |
} catch (IllegalArgumentException ex) { | |
throw new RuntimeException( | |
"Invalid data block encoding type configured for the param " | |
+ DATABLOCK_ENCODING_CONF_KEY + " : " | |
+ dataBlockEncodingStr); | |
} | |
} | |
return new RecordWriter<ImmutableBytesWritable, KeyValue>() { | |
// Map of families to writers and how much has been output on the writer. | |
private final Map<byte [], WriterLength> writers = | |
new TreeMap<byte [], WriterLength>(Bytes.BYTES_COMPARATOR); | |
private byte [] previousRow = HConstants.EMPTY_BYTE_ARRAY; | |
private final byte [] now = Bytes.toBytes(System.currentTimeMillis()); | |
private boolean rollRequested = false; | |
public void write(ImmutableBytesWritable row, KeyValue kv) | |
throws IOException { | |
// null input == user explicitly wants to flush | |
if (row == null && kv == null) { | |
rollWriters(); | |
return; | |
} | |
/** ADROLL CODE HERE */ | |
// We need to extract the table and row from the input, | |
// since the hadoop code assumes its just the row at this point | |
String[] tableAndKey = getTableAndRow(row); | |
String tableName = tableAndKey[0]; | |
byte[] rowKey = Bytes.toBytes(tableAndKey[1]); | |
long length = kv.getLength(); | |
byte [] family = kv.getFamily(); | |
WriterLength wl = this.writers.get(family); | |
// If this is a new column family, verify that the directory exists | |
if (wl == null) { | |
Path tableOutputDir = new Path(outputdir, tableName); | |
fs.mkdirs(new Path(tableOutputDir, Bytes.toString(family))); | |
} | |
/** END ADROLL CODE */ | |
// If any of the HFiles for the column families has reached | |
// maxsize, we need to roll all the writers | |
if (wl != null && wl.written + length >= maxsize) { | |
this.rollRequested = true; | |
} | |
// This can only happen once a row is finished though | |
if (rollRequested && Bytes.compareTo(this.previousRow, rowKey) != 0) { | |
rollWriters(); | |
} | |
// create a new HLog writer, if necessary | |
if (wl == null || wl.writer == null) { | |
/** ADROLL CODE HERE */ | |
// We need to pass the table in addition to the family to write to the proper path | |
wl = getNewWriter(tableName, family, conf); | |
/** END ADROLL CODE */ | |
} | |
// we now have the proper HLog writer. full steam ahead | |
kv.updateLatestStamp(this.now); | |
wl.writer.append(kv); | |
wl.written += length; | |
// Copy the row so we know when a row transition. | |
this.previousRow = rowKey; | |
} | |
private void rollWriters() throws IOException { | |
for (WriterLength wl : this.writers.values()) { | |
if (wl.writer != null) { | |
LOG.info("Writer=" + wl.writer.getPath() + | |
((wl.written == 0)? "": ", wrote=" + wl.written)); | |
close(wl.writer); | |
} | |
wl.writer = null; | |
wl.written = 0; | |
} | |
this.rollRequested = false; | |
} | |
/* Create a new StoreFile.Writer. | |
* @param family | |
* @return A WriterLength, containing a new StoreFile.Writer. | |
* @throws IOException | |
*/ | |
private WriterLength getNewWriter(String tableName, byte[] family, Configuration conf) | |
throws IOException { | |
WriterLength wl = new WriterLength(); | |
/** ADROLL CODE HERE */ | |
// We need to demux the output per table into separate directories | |
Path tableOutputDir = new Path(outputdir, tableName); | |
Path familydir = new Path(tableOutputDir, Bytes.toString(family)); | |
/** END ADROLL CODE */ | |
String compression = compressionMap.get(family); | |
compression = compression == null ? defaultCompression : compression; | |
String bloomTypeStr = bloomTypeMap.get(family); | |
BloomType bloomType = BloomType.NONE; | |
if (bloomTypeStr != null) { | |
bloomType = BloomType.valueOf(bloomTypeStr); | |
} | |
Configuration tempConf = new Configuration(conf); | |
tempConf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f); | |
wl.writer = new StoreFile.WriterBuilder(conf, new CacheConfig(tempConf), fs, blocksize) | |
.withOutputDir(familydir) | |
.withCompression(AbstractHFileWriter.compressionByName(compression)) | |
.withBloomType(bloomType) | |
.withComparator(KeyValue.COMPARATOR) | |
.withDataBlockEncoder(encoder) | |
.withChecksumType(Store.getChecksumType(conf)) | |
.withBytesPerChecksum(Store.getBytesPerChecksum(conf)) | |
.build(); | |
this.writers.put(family, wl); | |
return wl; | |
} | |
private void close(final StoreFile.Writer w) throws IOException { | |
if (w != null) { | |
w.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, | |
Bytes.toBytes(System.currentTimeMillis())); | |
w.appendFileInfo(StoreFile.BULKLOAD_TASK_KEY, | |
Bytes.toBytes(context.getTaskAttemptID().toString())); | |
w.appendFileInfo(StoreFile.MAJOR_COMPACTION_KEY, | |
Bytes.toBytes(true)); | |
w.appendFileInfo(StoreFile.EXCLUDE_FROM_MINOR_COMPACTION_KEY, | |
Bytes.toBytes(compactionExclude)); | |
w.appendTrackedTimestampsToMetadata(); | |
w.close(); | |
} | |
} | |
public void close(TaskAttemptContext c) | |
throws IOException, InterruptedException { | |
for (WriterLength wl: this.writers.values()) { | |
close(wl.writer); | |
} | |
} | |
}; | |
} | |
/** | |
* Write out a SequenceFile that can be read by TotalOrderPartitioner | |
* that contains the split points in startKeys. | |
* @param partitionsPath output path for SequenceFile | |
* @param startKeys the region start keys | |
*/ | |
protected static void writePartitions(Configuration conf, Path partitionsPath, | |
List<ImmutableBytesWritable> startKeys) throws IOException { | |
if (startKeys.isEmpty()) { | |
throw new IllegalArgumentException("No regions passed"); | |
} | |
// We're generating a list of split points, and we don't ever | |
// have keys < the first region (which has an empty start key) | |
// so we need to remove it. Otherwise we would end up with an | |
// empty reducer with index 0 | |
TreeSet<ImmutableBytesWritable> sorted = | |
new TreeSet<ImmutableBytesWritable>(startKeys); | |
/** ADROLL CODE HERE */ | |
// Since we prefix the row with the table, | |
// we need to check if row portion is empty instead of the whole thing. | |
ImmutableBytesWritable first = sorted.first(); | |
String rowKey = getRow(first); | |
if (!rowKey.isEmpty()) { | |
throw new IllegalArgumentException( | |
"First region of table should have empty start key. Instead has: " | |
+ Bytes.toStringBinary(first.get())); | |
} | |
/** END ADROLL CODE */ | |
sorted.remove(first); | |
// Write the actual file | |
FileSystem fs = partitionsPath.getFileSystem(conf); | |
SequenceFile.Writer writer = SequenceFile.createWriter(fs, | |
conf, partitionsPath, ImmutableBytesWritable.class, NullWritable.class); | |
try { | |
for (ImmutableBytesWritable startKey : sorted) { | |
writer.append(startKey, NullWritable.get()); | |
} | |
} finally { | |
writer.close(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment