Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@bbeaudreault
Last active December 28, 2015 21:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bbeaudreault/7567385 to your computer and use it in GitHub Desktop.
Save bbeaudreault/7567385 to your computer and use it in GitHub Desktop.
Modifications to 0.94.x HMerge tool, to allow running against an online cluster with target table disabled. Important: See README.

This is provided AS IS. The code was copied from the 0.94.x codebase and modified to work on an online cluster (target table must be disabled). Testing in a development environment is highly recommended.

Here is our workflow for using this:

  1. Disable table
  2. Snapshot table
  3. Create clone from snapshot
  4. Major compact clone (to resolve links)
  5. Wait for compactions to finish
  6. Disable clone
  7. Run merge tool against clone
  8. Re-enable clone, verify data.
  9. If all worked well, switch code to look at clone and drop the old table.
/**
* Copyright 2009 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MergeTools;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
/**
* A non-instantiable class that has a static method capable of compacting
* a table by merging adjacent regions.
*/
public class HMerge {
public static String REGION_FACTOR_CONF = "hmerge.region.factor";
public static int REGION_FACTOR_DEFAULT = 2;
// TODO: Where is this class used? How does it relate to Merge in same package?
static final Log LOG = LogFactory.getLog(HMerge.class);
static final Random rand = new Random();
/*
* Not instantiable
*/
private HMerge() {
super();
}
/**
* Scans the table and merges two adjacent regions if they are small. This
* only happens when a lot of rows are deleted.
*
* When merging the META region, the HBase instance must be offline.
* When merging a normal table, the HBase instance must be online, but the
* table must be disabled.
*
* @param conf - configuration object for HBase
* @param fs - FileSystem where regions reside
* @param tableName - Table to be compacted
* @throws IOException
* @throws InterruptedException
*/
public static void merge(Configuration conf, FileSystem fs,
final byte [] tableName, final boolean shouldExecute)
throws IOException, InterruptedException {
merge(conf, fs, tableName, true, shouldExecute);
}
/**
* Scans the table and merges two adjacent regions if they are small. This
* only happens when a lot of rows are deleted.
*
* When merging the META region, the HBase instance must be offline.
* When merging a normal table, the HBase instance must be online, but the
* table must be disabled.
*
* @param conf - configuration object for HBase
* @param fs - FileSystem where regions reside
* @param tableName - Table to be compacted
* @param testMasterRunning True if we are to verify master is down before
* running merge
* @throws IOException
* @throws InterruptedException
*/
public static void merge(Configuration conf, FileSystem fs,
final byte [] tableName, final boolean testMasterRunning, final boolean shouldExecute)
throws IOException, InterruptedException {
boolean masterIsRunning = false;
if (testMasterRunning) {
masterIsRunning = HConnectionManager
.execute(new HConnectable<Boolean>(conf) {
@Override
public Boolean connect(HConnection connection) throws IOException {
return connection.isMasterRunning();
}
});
}
if(!masterIsRunning) {
throw new IllegalStateException(
"HBase instance must be running to merge a normal table");
}
HBaseAdmin admin = new HBaseAdmin(conf);
try {
if (!admin.isTableDisabled(tableName)) {
throw new TableNotDisabledException(tableName);
}
} finally {
admin.close();
}
new OnlineMerger(conf, fs, tableName, shouldExecute).process();
}
private static abstract class Merger {
protected final Configuration conf;
protected final FileSystem fs;
protected final Path tabledir;
protected final HTableDescriptor htd;
protected final HLog hlog;
protected final long maxFilesize;
protected final boolean execute;
protected Merger(Configuration conf, FileSystem fs, final byte [] tableName, boolean execute)
throws IOException {
this.conf = conf;
this.fs = fs;
this.maxFilesize = conf.getLong(HConstants.HREGION_MAX_FILESIZE,
HConstants.DEFAULT_MAX_FILE_SIZE);
this.tabledir = new Path(
fs.makeQualified(new Path(conf.get(HConstants.HBASE_DIR))),
Bytes.toString(tableName)
);
this.htd = FSTableDescriptors.getTableDescriptor(this.fs, this.tabledir);
Path logdir = new Path(tabledir, "merge_" + System.currentTimeMillis() +
HConstants.HREGION_LOGDIR_NAME);
Path oldLogDir = new Path(tabledir, HConstants.HREGION_OLDLOGDIR_NAME);
this.hlog = new HLog(fs, logdir, oldLogDir, conf);
this.execute = execute;
}
void process() throws IOException, InterruptedException {
try {
for (HRegion[] regionsToMerge = next();
regionsToMerge != null;
regionsToMerge = next()) {
if (!merge(regionsToMerge)) {
return;
}
}
} finally {
try {
hlog.closeAndDelete();
} catch(IOException e) {
LOG.error(e);
}
}
}
protected boolean merge(final HRegion[] regions) throws IOException, InterruptedException {
if (regions.length < 2) {
LOG.info("only one region - nothing to merge");
return false;
}
if (execute) {
HRegion mergedRegion = MergeTools.mergeContiguous(regions, false);
byte[][] names = new byte[regions.length][];
for (int i = 0; i < regions.length; i++) {
names[i] = regions[i].getRegionName();
}
updateMeta(names, mergedRegion);
}
for (HRegion region : regions) {
region.close();
}
return true;
}
protected abstract HRegion[] next() throws IOException;
protected abstract void updateMeta(final byte[][] regionsToDelete, HRegion newRegion) throws IOException;
}
/** Instantiated to compact a normal user table */
private static class OnlineMerger extends Merger {
private final String tableName;
private final byte[] tableNameBytes;
private final HTable metaTable;
private final ResultScanner metaScanner;
private HRegionInfo latestRegion;
OnlineMerger(Configuration conf, FileSystem fs,
final byte [] tableNameBytes, boolean execute)
throws IOException {
super(conf, fs, tableNameBytes, execute);
this.tableName = Bytes.toString(tableNameBytes);
this.tableNameBytes = tableNameBytes;
this.metaTable = new HTable(conf, HConstants.META_TABLE_NAME);
this.metaScanner = metaTable.getScanner(HConstants.CATALOG_FAMILY,
HConstants.REGIONINFO_QUALIFIER);
this.latestRegion = null;
}
private HRegionInfo nextRegion() throws IOException {
try {
Result results = getMetaRow();
if (results == null) {
return null;
}
byte[] regionInfoValue = results.getValue(HConstants.CATALOG_FAMILY,
HConstants.REGIONINFO_QUALIFIER);
if (regionInfoValue == null || regionInfoValue.length == 0) {
throw new NoSuchElementException("meta region entry missing " +
Bytes.toString(HConstants.CATALOG_FAMILY) + ":" +
Bytes.toString(HConstants.REGIONINFO_QUALIFIER));
}
HRegionInfo region = Writables.getHRegionInfo(regionInfoValue);
if (!Bytes.equals(region.getTableName(), this.tableNameBytes)) {
return null;
}
return region;
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.error("meta scanner error", e);
metaScanner.close();
throw e;
}
}
/*
* Check current row has a HRegionInfo. Skip to next row if HRI is empty.
* @return A Map of the row content else null if we are off the end.
* @throws IOException
*/
private Result getMetaRow() throws IOException {
Result currentRow = metaScanner.next();
boolean foundResult = false;
while (currentRow != null) {
String row = Bytes.toStringBinary(currentRow.getRow());
byte[] regionInfoValue = currentRow.getValue(HConstants.CATALOG_FAMILY,
HConstants.REGIONINFO_QUALIFIER);
if (regionInfoValue == null || regionInfoValue.length == 0) {
currentRow = metaScanner.next();
continue;
}
if (!row.substring(0, row.indexOf(",")).equals(tableName)) {
currentRow = metaScanner.next();
continue;
}
LOG.info("Row: <" + row + ">");
foundResult = true;
break;
}
return foundResult ? currentRow : null;
}
@Override
protected HRegion[] next() throws IOException {
List<HRegion> regions = new ArrayList<HRegion>();
if(latestRegion == null) {
latestRegion = nextRegion();
}
int sizeToMerge = 0;
int max = conf.getInt(REGION_FACTOR_CONF, REGION_FACTOR_DEFAULT);
// Limit to a max of @max, limited also to around half of the @maxFileSize
for (int i = 0; i < max; i++) {
if (latestRegion != null) {
HRegion currentRegion = HRegion.newHRegion(tabledir, hlog, fs, conf, latestRegion, this.htd, null);
currentRegion.initialize();
sizeToMerge += currentRegion.getLargestHStoreSize();
regions.add(currentRegion);
if (sizeToMerge >= (maxFilesize / 2)) {
break;
}
}
// dont increment on last iteration
if (i + 1 < max) {
latestRegion = nextRegion();
}
}
return regions.toArray(new HRegion[regions.size()]);
}
@Override
protected void updateMeta(final byte[][] regionsToDelete, HRegion newRegion) throws IOException {
for (int r = 0; r < regionsToDelete.length; r++) {
if(latestRegion != null && Bytes.equals(regionsToDelete[r], latestRegion.getRegionName())) {
latestRegion = null;
}
Delete delete = new Delete(regionsToDelete[r]);
if (execute) {
metaTable.delete(delete);
}
if(LOG.isDebugEnabled()) {
LOG.debug("updated columns in row: " + Bytes.toStringBinary(regionsToDelete[r]));
}
}
Put put = new Put(newRegion.getRegionName());
put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
Writables.getBytes(newRegion.getRegionInfo()));
if (execute) {
metaTable.put(put);
}
if(LOG.isDebugEnabled()) {
LOG.debug("updated columns in row: "
+ Bytes.toStringBinary(newRegion.getRegionName()));
}
}
}
}
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
public class MergeTools {
/**
* Functions pulled from org.apache.hadoop.hbase.regionserver.HRegion
*/
public static final Log LOG = LogFactory.getLog(MergeTools.class);
/**
* Merge two HRegions. The regions must be adjacent and must not overlap.
*
* @param srcA
* @param srcB
* @return new merged HRegion
* @throws IOException
* @throws InterruptedException
*/
public static HRegion mergeContiguous(final HRegion[] regions, boolean compact)
throws IOException, InterruptedException {
// Ensure they are of all the same table
String tableName = null;
for (HRegion region : regions) {
if (tableName == null) {
tableName = region.getRegionInfo().getTableNameAsString();
} else if (!tableName.equals(region.getRegionInfo().getTableNameAsString())) {
throw new IllegalArgumentException("Regions do not belong to the same table");
}
}
// Sort them so we can find the right start and end keys
Arrays.sort(regions, new Comparator<HRegion>() {
@Override
public int compare(HRegion left, HRegion right) {
if (left.getStartKey() == null) {
if (right.getStartKey() == null) {
throw new IllegalArgumentException("Cannot merge two regions with null start key");
} else {
return -1;
}
} else if (right.getStartKey() == null) {
return 1;
} else {
return left.comparator.compareRows(left.getStartKey(), 0, left.getStartKey().length, right.getStartKey(), 0, right.getStartKey().length);
}
}
});
// Ensure contiguous. That is, the end key of one region equals the start of the next
for (int i = 0; i < regions.length - 1; i++) {
if (Bytes.compareTo(regions[i].getEndKey(), regions[i+1].getStartKey()) != 0) {
throw new IllegalArgumentException("Cannot merge non-contiguous regions");
}
}
final FileSystem fs = regions[0].getFilesystem();
// Make sure each region's cache is empty
for (HRegion region : regions) {
if (region.flushcache() && !compact) {
LOG.debug("Flush resulted in new storefile. Must compact.");
compact = true;
}
if (!compact && tooManyStoreFiles(region)) {
LOG.debug("Too many storefiles in a region. Must compact.");
compact = true;
}
}
// Compact each region so we only have one store file per family
if (compact) {
ExecutorService executorService = Executors.newFixedThreadPool(regions.length);
for (final HRegion region : regions) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
region.compactStores(true);
if (LOG.isDebugEnabled()) {
LOG.debug("Files for region: " + region);
listPaths(fs, region.getRegionDir());
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
});
}
executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
}
Configuration conf = regions[0].getConf();
HTableDescriptor tabledesc = regions[0].getTableDesc();
HLog log = regions[0].getLog();
Path tableDir = regions[0].getTableDir();
// Since we have ensured the regions are sorted and contiguous, we can just take the start of first and end of the last
final byte[] startKey = regions[0].getStartKey();
final byte[] endKey = regions[regions.length - 1].getEndKey();
HRegionInfo newRegionInfo = new HRegionInfo(tabledesc.getName(), startKey, endKey);
LOG.info("Creating new region " + newRegionInfo.toString());
String encodedName = newRegionInfo.getEncodedName();
Path newRegionDir = HRegion.getRegionDir(regions[0].getTableDir(), encodedName);
if(fs.exists(newRegionDir)) {
throw new IOException("Cannot merge; target file collision at " +
newRegionDir);
}
fs.mkdirs(newRegionDir);
LOG.info("starting merge of " + regions.length +
" regions into new region " + newRegionInfo.toString() +
" with start key <" + Bytes.toStringBinary(startKey) + "> and end key <" +
Bytes.toStringBinary(endKey) + ">");
// Move HStoreFiles under new region directory
Map<byte [], List<StoreFile>> byFamily = new TreeMap<byte [], List<StoreFile>>(Bytes.BYTES_COMPARATOR);
for (HRegion region : regions) {
byFamily = filesByFamily(byFamily, region.close());
}
for (Map.Entry<byte [], List<StoreFile>> es : byFamily.entrySet()) {
byte [] colFamily = es.getKey();
HRegion.makeColumnFamilyDirs(fs, tableDir, newRegionInfo, colFamily);
// Because we compacted the source regions we should have no more than two
// HStoreFiles per family and there will be no reference store
List<StoreFile> srcFiles = es.getValue();
if (srcFiles.size() == 2) {
long seqA = srcFiles.get(0).getMaxSequenceId();
long seqB = srcFiles.get(1).getMaxSequenceId();
if (seqA == seqB) {
// Can't have same sequenceid since on open of a store, this is what
// distingushes the files (see the map of stores how its keyed by
// sequenceid).
throw new IOException("Files have same sequenceid: " + seqA);
}
}
for (StoreFile hsf: srcFiles) {
StoreFile.rename(fs, hsf.getPath(), StoreFile.getUniqueFile(fs, Store.getStoreHomedir(tableDir, newRegionInfo.getEncodedName(), colFamily)));
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Files for new region");
listPaths(fs, newRegionDir);
}
HRegion dstRegion = HRegion.newHRegion(tableDir, log, fs, conf, newRegionInfo, regions[0].getTableDesc(), null);
int readCount = 0;
int writeCount = 0;
for (HRegion region : regions) {
readCount += region.readRequestsCount.get();
writeCount += region.writeRequestsCount.get();
}
dstRegion.readRequestsCount.set(readCount);
dstRegion.writeRequestsCount.set(writeCount);
dstRegion.initialize();
//dstRegion.compactStores();
if (LOG.isDebugEnabled()) {
LOG.debug("Files for new region");
listPaths(fs, dstRegion.getRegionDir());
}
for (HRegion region : regions) {
HFileArchiver.archiveRegion(fs, FSUtils.getRootDir(region.getConf()), region.getTableDir(), region.getRegionDir());
}
LOG.info("merge completed. New region is " + dstRegion);
return dstRegion;
}
private static boolean tooManyStoreFiles(HRegion region) {
for (Entry<byte[], Store> store : region.getStores().entrySet()) {
if (store.getValue().getNumberOfStoreFiles() > 1) {
return true;
}
}
return false;
}
@SuppressWarnings("deprecation")
private static void listPaths(FileSystem fs, Path dir) throws IOException {
if (LOG.isDebugEnabled()) {
FileStatus[] stats = FSUtils.listStatus(fs, dir, null);
if (stats == null || stats.length == 0) {
return;
}
for (int i = 0; i < stats.length; i++) {
String path = stats[i].getPath().toString();
if (stats[i].isDir()) {
LOG.debug("d " + path);
listPaths(fs, stats[i].getPath());
} else {
LOG.debug("f " + path + " size=" + stats[i].getLen());
}
}
}
}
/*
* Fills a map with a vector of store files keyed by column family.
* @param byFamily Map to fill.
* @param storeFiles Store files to process.
* @param family
* @return Returns <code>byFamily</code>
*/
private static Map<byte [], List<StoreFile>> filesByFamily(
Map<byte [], List<StoreFile>> byFamily, List<StoreFile> storeFiles) {
for (StoreFile src: storeFiles) {
byte [] family = src.getFamily();
List<StoreFile> v = byFamily.get(family);
if (v == null) {
v = new ArrayList<StoreFile>();
byFamily.put(family, v);
}
v.add(src);
}
return byFamily;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment