Skip to content

Instantly share code, notes, and snippets.

@axiak
Created April 8, 2014 21:17
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save axiak/10193981 to your computer and use it in GitHub Desktop.
Save axiak/10193981 to your computer and use it in GitHub Desktop.
public class MajorCompactServersJob extends AbstractJob {
public static final String SHORT_OPT = "majorCompactServers";
public static final String LONG_OPT = "majorCompactServers";
public static final String DESCRIPTION = "Major compact all regions on a server or list of servers";
private final HBaseAdminWrapper wrapper;
private final SlowCompactionManager compactor;
private final RegionLoadEstimation regionLoadEstimation;
private final HdfsLocalityInfo hdfsLocalityInfo;
@Inject @ForArg(HBaseTaskOption.SERVER_NAME)
private CommandLineValue<String> serverName;
@Inject
public MajorCompactServersJob(final HBaseAdminWrapper wrapper, final SlowCompactionManager compactor, final RegionLoadEstimation regionLoadEstimation, final HdfsLocalityInfo hdfsLocalityInfo) {
this.wrapper = wrapper;
this.compactor = compactor;
this.regionLoadEstimation = regionLoadEstimation;
this.hdfsLocalityInfo = hdfsLocalityInfo;
}
@Override
public void run() {
try {
runJob();
} catch (Exception e) {
throw Throwables.propagate(e);
}
}
private void runJob() throws Exception {
final Set<String> servers = Sets.newHashSet();
if (serverName.get().contains(",")) {
for (String server : serverName.get().split(",")) {
servers.add(server);
}
} else {
servers.add(serverName.get());
}
Multimap<ServerName, HRegionInfo> regions = wrapper.getRegionInfosByServer(true);
Multimap<ServerName, RegionStats> regionList = RegionStats.regionInfoToStats(Multimaps.filterKeys(regions, new Predicate<ServerName>() {
@Override
public boolean apply(ServerName input) {
return servers.contains(input.getHostname());
}
}));
regionLoadEstimation.annotateRegionsWithLoadInPlace(regionList);
hdfsLocalityInfo.annotateRegionsWithLocalityInPlace(regionList);
compactor.compactAllRegions(regionList.values());
}
}
@Singleton
public class SlowCompactionManager {
private final Log LOG = LogFactory.getLog(SlowCompactionManager.class);
@Inject @ForArg(HBaseTaskOption.COMPACTION_QUIET_TIME)
private CommandLineValue<Integer> compactionQuietTime;
@Inject @ForArg(HBaseTaskOption.SIMULTANEOUS_COMPACTIONS)
private CommandLineValue<Integer> simultaneousCompactions;
@Inject @ForArg(HBaseTaskOption.MAX_TIME_MILLIS)
private CommandLineValue<Long> maxCompactionTimeMillis;
@Inject @ForArg(HBaseTaskOption.COMPACTION_TYPE)
private CommandLineValue<String> compactionTypeOption;
@Inject @ForArg(HBaseTaskOption.COMPACT_THRESHOLD)
private CommandLineValue<Double> compactThreshold;
private final HBaseAdminWrapper hBaseAdminWrapper;
private final RegionServerJMXInfo regionServerJMXInfo;
private final Map<ServerName, BlockingDeque<HRegionInfo>> serversToRegions;
private final CountDownLatch started = new CountDownLatch(1);
private volatile long regionServerUpdatedAt = 0L;
private final AtomicReference<Map<HRegionInfo, ServerName>> regionServerInfo = Atomics.newReference();
private AtomicInteger awaitingCompactions = new AtomicInteger(0);
private AtomicInteger completedCompactions = new AtomicInteger(0);
private List<Thread> compactionThreads;
@Inject
public SlowCompactionManager(final HBaseAdminWrapper hBaseAdminWrapper, final RegionServerJMXInfo regionServerJMXInfo) {
this.hBaseAdminWrapper = hBaseAdminWrapper;
this.regionServerJMXInfo = regionServerJMXInfo;
this.serversToRegions = Maps.newConcurrentMap();
}
public synchronized void startCompactionQueue() throws Exception {
if (started.getCount() < 1) return;
this.compactionThreads = Lists.newArrayList();
final Collection<ServerName> servers = hBaseAdminWrapper.get().getClusterStatus().getServers();
for (final ServerName server: servers) {
addServer(server);
}
started.countDown();
}
private synchronized void addServer(final ServerName serverName) {
if (serversToRegions.containsKey(serverName)) return;
serversToRegions.put(serverName, Queues.<HRegionInfo>newLinkedBlockingDeque());
final Thread compactionThread = new Thread(new SingleServerCompaction(compactionQuietTime.get(), simultaneousCompactions.get(), serverName, serversToRegions.get(serverName)));
compactionThread.setName(String.format("compaction-thread-%s", serverName.getHostname()));
compactionThread.setDaemon(true);
compactionThread.start();
compactionThreads.add(compactionThread);
}
private synchronized Map<HRegionInfo, ServerName> getRegionLocations() throws IOException {
if (regionServerInfo.get() == null || System.currentTimeMillis() - regionServerUpdatedAt > TimeUnit.MINUTES.toMillis(1)) {
final Map<HRegionInfo, ServerName> newInfo = hBaseAdminWrapper.getAllRegionInfos(true);
regionServerInfo.set(newInfo);
regionServerUpdatedAt = System.currentTimeMillis();
}
return regionServerInfo.get();
}
public void compactRegionIfNecessary(final RegionStats stats) throws IOException {
compactRegionOnServerIfNecessary(stats, getRegionLocations().get(stats.getRegionInfo()));
}
public void compactRegionOnServer(final HRegionInfo regionToCompact, final ServerName serverName) throws IOException {
Preconditions.checkState(started.getCount() < 1, "Need to call startCompactionQueue() first.");
addServer(serverName);
serversToRegions.get(serverName).addLast(regionToCompact);
awaitingCompactions.incrementAndGet();
}
public void compactRegionOnServerIfNecessary(final RegionStats stats, final ServerName serverName) throws IOException {
if (stats.localityFactor(serverName) > compactThreshold.get()) {
System.out.println(String.format(" Not compacting %s on %s because it's local.", stats.getRegionInfo(), serverName));
} else {
compactRegionOnServer(stats.getRegionInfo(), serverName);
}
}
public void compactAllRegions(final Iterable<RegionStats> regionsToCompact) throws Exception {
final Stopwatch stopwatch = new Stopwatch().start();
startCompactionQueue();
int i = 0;
List<RegionStats> regions = Lists.newArrayList(regionsToCompact);
Collections.sort(regions, new Comparator<RegionStats>() {
@Override
public int compare(final RegionStats o1, final RegionStats o2) {
return Doubles.compare(o2.transferCost(o2.getServerName()), o1.transferCost(o1.getServerName()));
}
});
for (final RegionStats region : regions) {
compactRegionOnServerIfNecessary(region, region.getServerName());
++i;
}
System.out.println(String.format("Added %s regions to queue for compaction.", i));
Thread.sleep(5000);
if (maxCompactionTimeMillis.isPresent()) {
awaitCompactions(maxCompactionTimeMillis.get() - stopwatch.stop().elapsedMillis(), TimeUnit.MILLISECONDS);
} else {
awaitCompactions();
}
}
public void awaitCompactions() {
awaitCompactions(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
}
public void awaitCompactions(long timeout, TimeUnit timeUnit) {
final long totalNanos = timeUnit.toNanos(timeout);
final long start = System.nanoTime();
while (awaitingCompactions.get() > 0) {
System.out.println(String.format(" Compacted %s regions. %s regions left in queue.", completedCompactions.get(), awaitingCompactions.get()));
if (System.nanoTime() - start > totalNanos) {
System.out.println(String.format(" Stopping job. We compacted %s regions", completedCompactions.get()));
return;
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
throw Throwables.propagate(e);
}
}
}
private class SingleServerCompaction implements Runnable {
private static final int MAX_SINGLE_REGION_FAILURES = 5;
private final int secondsBetweenCompactions;
private final int maxCompactionQueuePerServer;
private final ServerName server;
private final BlockingDeque<HRegionInfo> regions;
private final Multiset<HRegionInfo> regionFailures = HashMultiset.create();
private SingleServerCompaction(final int secondsBetweenCompactions, final int maxCompactionQueuePerServer, final ServerName server, final BlockingDeque<HRegionInfo> regions) {
this.secondsBetweenCompactions = secondsBetweenCompactions;
this.maxCompactionQueuePerServer = maxCompactionQueuePerServer;
this.server = server;
this.regions = regions;
}
@Override
public void run() {
while (true) {
final HRegionInfo region;
try {
region = regions.takeFirst();
} catch (InterruptedException e) {
awaitingCompactions.decrementAndGet();
throw Throwables.propagate(e);
}
try {
compactNextRegion(region);
completedCompactions.incrementAndGet();
} catch (Throwable t) {
if (regionFailures.count(region) >= MAX_SINGLE_REGION_FAILURES) {
LOG.error("Skipping region due to too many failures.", t);
continue;
}
regionFailures.add(region);
LOG.warn(t, t);
regions.addLast(region);
} finally {
awaitingCompactions.decrementAndGet();
}
}
}
private void compactNextRegion(final HRegionInfo region) throws Throwable {
waitUntilCompactionRoom();
if (secondsBetweenCompactions > 0) {
Thread.sleep(TimeUnit.SECONDS.toMillis(secondsBetweenCompactions));
}
waitUntilCompactionRoom();
System.out.println(String.format("-- (%s) COMPACTING %s on %s", compactionTypeOption.get(), region.getRegionNameAsString(), server.getHostname()));
boolean major = "MAJOR".equals(compactionTypeOption.get().toUpperCase());
if (major) {
hBaseAdminWrapper.get().majorCompact(region.getRegionName());
}
else {
hBaseAdminWrapper.get().compact(region.getRegionName());
}
Thread.sleep(200);
}
private void waitUntilCompactionRoom() throws Exception {
do {
Thread.sleep(5000);
} while (regionServerJMXInfo.getCompactionQueueSize(server.getHostname()) >= maxCompactionQueuePerServer);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment