|
/** |
|
* Copyright 2009 The Apache Software Foundation |
|
* |
|
* Licensed to the Apache Software Foundation (ASF) under one |
|
* or more contributor license agreements. See the NOTICE file |
|
* distributed with this work for additional information |
|
* regarding copyright ownership. The ASF licenses this file |
|
* to you under the Apache License, Version 2.0 (the |
|
* "License"); you may not use this file except in compliance |
|
* with the License. You may obtain a copy of the License at |
|
* |
|
* http://www.apache.org/licenses/LICENSE-2.0 |
|
* |
|
* Unless required by applicable law or agreed to in writing, software |
|
* distributed under the License is distributed on an "AS IS" BASIS, |
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
* See the License for the specific language governing permissions and |
|
* limitations under the License. |
|
*/ |
|
package org.apache.hadoop.hbase.util; |
|
|
|
import java.io.IOException; |
|
import java.util.ArrayList; |
|
import java.util.List; |
|
import java.util.NoSuchElementException; |
|
import java.util.Random; |
|
|
|
import org.apache.commons.logging.Log; |
|
import org.apache.commons.logging.LogFactory; |
|
import org.apache.hadoop.conf.Configuration; |
|
import org.apache.hadoop.fs.FileSystem; |
|
import org.apache.hadoop.fs.Path; |
|
import org.apache.hadoop.hbase.HConstants; |
|
import org.apache.hadoop.hbase.HRegionInfo; |
|
import org.apache.hadoop.hbase.HTableDescriptor; |
|
import org.apache.hadoop.hbase.RemoteExceptionHandler; |
|
import org.apache.hadoop.hbase.TableNotDisabledException; |
|
import org.apache.hadoop.hbase.client.Delete; |
|
import org.apache.hadoop.hbase.client.HBaseAdmin; |
|
import org.apache.hadoop.hbase.client.HConnection; |
|
import org.apache.hadoop.hbase.client.HConnectionManager; |
|
import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable; |
|
import org.apache.hadoop.hbase.client.HTable; |
|
import org.apache.hadoop.hbase.client.Put; |
|
import org.apache.hadoop.hbase.client.Result; |
|
import org.apache.hadoop.hbase.client.ResultScanner; |
|
import org.apache.hadoop.hbase.regionserver.HRegion; |
|
import org.apache.hadoop.hbase.regionserver.MergeTools; |
|
import org.apache.hadoop.hbase.regionserver.wal.HLog; |
|
|
|
/** |
|
* A non-instantiable class that has a static method capable of compacting |
|
* a table by merging adjacent regions. |
|
*/ |
|
public class HMerge { |
|
public static String REGION_FACTOR_CONF = "hmerge.region.factor"; |
|
public static int REGION_FACTOR_DEFAULT = 2; |
|
// TODO: Where is this class used? How does it relate to Merge in same package? |
|
static final Log LOG = LogFactory.getLog(HMerge.class); |
|
static final Random rand = new Random(); |
|
|
|
/* |
|
* Not instantiable |
|
*/ |
|
private HMerge() { |
|
super(); |
|
} |
|
|
|
/** |
|
* Scans the table and merges two adjacent regions if they are small. This |
|
* only happens when a lot of rows are deleted. |
|
* |
|
* When merging the META region, the HBase instance must be offline. |
|
* When merging a normal table, the HBase instance must be online, but the |
|
* table must be disabled. |
|
* |
|
* @param conf - configuration object for HBase |
|
* @param fs - FileSystem where regions reside |
|
* @param tableName - Table to be compacted |
|
* @throws IOException |
|
* @throws InterruptedException |
|
*/ |
|
public static void merge(Configuration conf, FileSystem fs, |
|
final byte [] tableName, final boolean shouldExecute) |
|
throws IOException, InterruptedException { |
|
merge(conf, fs, tableName, true, shouldExecute); |
|
} |
|
|
|
/** |
|
* Scans the table and merges two adjacent regions if they are small. This |
|
* only happens when a lot of rows are deleted. |
|
* |
|
* When merging the META region, the HBase instance must be offline. |
|
* When merging a normal table, the HBase instance must be online, but the |
|
* table must be disabled. |
|
* |
|
* @param conf - configuration object for HBase |
|
* @param fs - FileSystem where regions reside |
|
* @param tableName - Table to be compacted |
|
* @param testMasterRunning True if we are to verify master is down before |
|
* running merge |
|
* @throws IOException |
|
* @throws InterruptedException |
|
*/ |
|
public static void merge(Configuration conf, FileSystem fs, |
|
final byte [] tableName, final boolean testMasterRunning, final boolean shouldExecute) |
|
throws IOException, InterruptedException { |
|
boolean masterIsRunning = false; |
|
if (testMasterRunning) { |
|
masterIsRunning = HConnectionManager |
|
.execute(new HConnectable<Boolean>(conf) { |
|
@Override |
|
public Boolean connect(HConnection connection) throws IOException { |
|
return connection.isMasterRunning(); |
|
} |
|
}); |
|
} |
|
if(!masterIsRunning) { |
|
throw new IllegalStateException( |
|
"HBase instance must be running to merge a normal table"); |
|
} |
|
HBaseAdmin admin = new HBaseAdmin(conf); |
|
try { |
|
if (!admin.isTableDisabled(tableName)) { |
|
throw new TableNotDisabledException(tableName); |
|
} |
|
} finally { |
|
admin.close(); |
|
} |
|
new OnlineMerger(conf, fs, tableName, shouldExecute).process(); |
|
} |
|
|
|
private static abstract class Merger { |
|
protected final Configuration conf; |
|
protected final FileSystem fs; |
|
protected final Path tabledir; |
|
protected final HTableDescriptor htd; |
|
protected final HLog hlog; |
|
protected final long maxFilesize; |
|
protected final boolean execute; |
|
|
|
protected Merger(Configuration conf, FileSystem fs, final byte [] tableName, boolean execute) |
|
throws IOException { |
|
this.conf = conf; |
|
this.fs = fs; |
|
this.maxFilesize = conf.getLong(HConstants.HREGION_MAX_FILESIZE, |
|
HConstants.DEFAULT_MAX_FILE_SIZE); |
|
|
|
this.tabledir = new Path( |
|
fs.makeQualified(new Path(conf.get(HConstants.HBASE_DIR))), |
|
Bytes.toString(tableName) |
|
); |
|
this.htd = FSTableDescriptors.getTableDescriptor(this.fs, this.tabledir); |
|
Path logdir = new Path(tabledir, "merge_" + System.currentTimeMillis() + |
|
HConstants.HREGION_LOGDIR_NAME); |
|
Path oldLogDir = new Path(tabledir, HConstants.HREGION_OLDLOGDIR_NAME); |
|
this.hlog = new HLog(fs, logdir, oldLogDir, conf); |
|
|
|
this.execute = execute; |
|
} |
|
|
|
void process() throws IOException, InterruptedException { |
|
try { |
|
for (HRegion[] regionsToMerge = next(); |
|
regionsToMerge != null; |
|
regionsToMerge = next()) { |
|
if (!merge(regionsToMerge)) { |
|
return; |
|
} |
|
} |
|
} finally { |
|
try { |
|
hlog.closeAndDelete(); |
|
|
|
} catch(IOException e) { |
|
LOG.error(e); |
|
} |
|
} |
|
} |
|
|
|
protected boolean merge(final HRegion[] regions) throws IOException, InterruptedException { |
|
if (regions.length < 2) { |
|
LOG.info("only one region - nothing to merge"); |
|
return false; |
|
} |
|
|
|
if (execute) { |
|
HRegion mergedRegion = MergeTools.mergeContiguous(regions, false); |
|
byte[][] names = new byte[regions.length][]; |
|
for (int i = 0; i < regions.length; i++) { |
|
names[i] = regions[i].getRegionName(); |
|
} |
|
|
|
updateMeta(names, mergedRegion); |
|
} |
|
|
|
for (HRegion region : regions) { |
|
region.close(); |
|
} |
|
|
|
return true; |
|
} |
|
|
|
protected abstract HRegion[] next() throws IOException; |
|
|
|
protected abstract void updateMeta(final byte[][] regionsToDelete, HRegion newRegion) throws IOException; |
|
|
|
} |
|
|
|
/** Instantiated to compact a normal user table */ |
|
private static class OnlineMerger extends Merger { |
|
private final String tableName; |
|
private final byte[] tableNameBytes; |
|
private final HTable metaTable; |
|
private final ResultScanner metaScanner; |
|
private HRegionInfo latestRegion; |
|
|
|
OnlineMerger(Configuration conf, FileSystem fs, |
|
final byte [] tableNameBytes, boolean execute) |
|
throws IOException { |
|
super(conf, fs, tableNameBytes, execute); |
|
this.tableName = Bytes.toString(tableNameBytes); |
|
this.tableNameBytes = tableNameBytes; |
|
this.metaTable = new HTable(conf, HConstants.META_TABLE_NAME); |
|
this.metaScanner = metaTable.getScanner(HConstants.CATALOG_FAMILY, |
|
HConstants.REGIONINFO_QUALIFIER); |
|
this.latestRegion = null; |
|
} |
|
|
|
private HRegionInfo nextRegion() throws IOException { |
|
try { |
|
Result results = getMetaRow(); |
|
if (results == null) { |
|
return null; |
|
} |
|
byte[] regionInfoValue = results.getValue(HConstants.CATALOG_FAMILY, |
|
HConstants.REGIONINFO_QUALIFIER); |
|
if (regionInfoValue == null || regionInfoValue.length == 0) { |
|
throw new NoSuchElementException("meta region entry missing " + |
|
Bytes.toString(HConstants.CATALOG_FAMILY) + ":" + |
|
Bytes.toString(HConstants.REGIONINFO_QUALIFIER)); |
|
} |
|
HRegionInfo region = Writables.getHRegionInfo(regionInfoValue); |
|
if (!Bytes.equals(region.getTableName(), this.tableNameBytes)) { |
|
return null; |
|
} |
|
return region; |
|
} catch (IOException e) { |
|
e = RemoteExceptionHandler.checkIOException(e); |
|
LOG.error("meta scanner error", e); |
|
metaScanner.close(); |
|
throw e; |
|
} |
|
} |
|
|
|
/* |
|
* Check current row has a HRegionInfo. Skip to next row if HRI is empty. |
|
* @return A Map of the row content else null if we are off the end. |
|
* @throws IOException |
|
*/ |
|
private Result getMetaRow() throws IOException { |
|
Result currentRow = metaScanner.next(); |
|
boolean foundResult = false; |
|
while (currentRow != null) { |
|
String row = Bytes.toStringBinary(currentRow.getRow()); |
|
byte[] regionInfoValue = currentRow.getValue(HConstants.CATALOG_FAMILY, |
|
HConstants.REGIONINFO_QUALIFIER); |
|
if (regionInfoValue == null || regionInfoValue.length == 0) { |
|
currentRow = metaScanner.next(); |
|
continue; |
|
} |
|
|
|
if (!row.substring(0, row.indexOf(",")).equals(tableName)) { |
|
currentRow = metaScanner.next(); |
|
continue; |
|
} |
|
LOG.info("Row: <" + row + ">"); |
|
foundResult = true; |
|
break; |
|
} |
|
return foundResult ? currentRow : null; |
|
} |
|
|
|
@Override |
|
protected HRegion[] next() throws IOException { |
|
List<HRegion> regions = new ArrayList<HRegion>(); |
|
if(latestRegion == null) { |
|
latestRegion = nextRegion(); |
|
} |
|
|
|
int sizeToMerge = 0; |
|
int max = conf.getInt(REGION_FACTOR_CONF, REGION_FACTOR_DEFAULT); |
|
|
|
// Limit to a max of @max, limited also to around half of the @maxFileSize |
|
for (int i = 0; i < max; i++) { |
|
if (latestRegion != null) { |
|
HRegion currentRegion = HRegion.newHRegion(tabledir, hlog, fs, conf, latestRegion, this.htd, null); |
|
currentRegion.initialize(); |
|
sizeToMerge += currentRegion.getLargestHStoreSize(); |
|
regions.add(currentRegion); |
|
|
|
if (sizeToMerge >= (maxFilesize / 2)) { |
|
break; |
|
} |
|
} |
|
|
|
// dont increment on last iteration |
|
if (i + 1 < max) { |
|
latestRegion = nextRegion(); |
|
} |
|
} |
|
|
|
return regions.toArray(new HRegion[regions.size()]); |
|
} |
|
|
|
@Override |
|
protected void updateMeta(final byte[][] regionsToDelete, HRegion newRegion) throws IOException { |
|
for (int r = 0; r < regionsToDelete.length; r++) { |
|
if(latestRegion != null && Bytes.equals(regionsToDelete[r], latestRegion.getRegionName())) { |
|
latestRegion = null; |
|
} |
|
Delete delete = new Delete(regionsToDelete[r]); |
|
if (execute) { |
|
metaTable.delete(delete); |
|
} |
|
if(LOG.isDebugEnabled()) { |
|
LOG.debug("updated columns in row: " + Bytes.toStringBinary(regionsToDelete[r])); |
|
} |
|
} |
|
|
|
Put put = new Put(newRegion.getRegionName()); |
|
put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER, |
|
Writables.getBytes(newRegion.getRegionInfo())); |
|
if (execute) { |
|
metaTable.put(put); |
|
} |
|
if(LOG.isDebugEnabled()) { |
|
LOG.debug("updated columns in row: " |
|
+ Bytes.toStringBinary(newRegion.getRegionName())); |
|
} |
|
} |
|
} |
|
} |