Skip to content

Instantly share code, notes, and snippets.

@bbeaudreault
Created June 7, 2016 15:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bbeaudreault/bbec28c502d12148f8657aa3f9fecd4f to your computer and use it in GitHub Desktop.
Save bbeaudreault/bbec28c502d12148f8657aa3f9fecd4f to your computer and use it in GitHub Desktop.
package org.apache.hadoop.hbase.regionserver.metrics;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
import org.apache.hadoop.metrics2.MetricsCollector;
import com.yammer.metrics.stats.EWMA;
public class RpcHandlerUsageSourceImpl implements RpcHandlerUsageSource {
private static Log LOG = LogFactory.getLog(RpcHandlerUsageSourceImpl.class);
private static RpcHandlerUsageSourceImpl instance = null;
public static synchronized RpcHandlerUsageSourceImpl getInstance() {
if (instance == null) {
instance = new RpcHandlerUsageSourceImpl().init();
}
return instance;
}
private final Map<String, Map<String, EWMA>> userRpcUsageByPool = new HashMap<>();
private final Map<String, EWMA> idleRpcTimeByPool = new HashMap<>();
private final RpcHandlerThreadRegistry handlerRegistry = new RpcHandlerThreadRegistry();
private final Timer taskRunner = new Timer("RPC Handler Usage Metrics Timer", true);
private RpcHandlerQueueUsageMetrics queueUsageMetrics = null;
private RpcHandlerUserUsageMetrics userUsageMetrics = null;
private RpcHandlerUsageSourceImpl() {}
@Override
public synchronized void registerRpcHandler(String handlerPool) {
if (handlerPool == null) {
return;
}
if (!userRpcUsageByPool.containsKey(handlerPool)) {
userRpcUsageByPool.put(handlerPool, new ConcurrentHashMap<String, EWMA>());
}
if (!idleRpcTimeByPool.containsKey(handlerPool)) {
idleRpcTimeByPool.put(handlerPool, EWMA.oneMinuteEWMA());
}
handlerRegistry.registerPool(handlerPool);
handlerRegistry.set(handlerPool);
}
@Override
public void countIdlePeriod(long nanosIdle) {
String handlerPool = handlerRegistry.get();
if (handlerPool == null) {
LOG.warn("Unregistered RPC handler " + Thread.currentThread().getName() + " attempted to log an idle period.");
return; // don't accept usage data from unregistered handlers
}
idleRpcTimeByPool.get(handlerPool).update(nanosIdle);
}
@Override
public void countUsagePeriodForUser(final String username, final long nanosInUse) {
String handlerPool = handlerRegistry.get();
if (handlerPool == null) {
LOG.warn("Unregistered RPC handler " + Thread.currentThread().getName() + " attempted to log a usage period.");
return; // don't accept usage data from unregistered handlers
}
Map<String, EWMA> queueAveragesByUser = userRpcUsageByPool.get(handlerPool);
EWMA userMovingAverage = queueAveragesByUser.get(username);
if (userMovingAverage == null) {
userMovingAverage = EWMA.oneMinuteEWMA();
queueAveragesByUser.put(username, userMovingAverage);
}
userMovingAverage.update(nanosInUse);
}
@Override
public void unregisterRpcHandler() {
handlerRegistry.remove();
}
private RpcHandlerUsageSourceImpl init() {
taskRunner.schedule(new EWMATickTask(), TimeUnit.SECONDS.toMillis(5), TimeUnit.SECONDS.toMillis(5));
if (queueUsageMetrics == null ) {
this.queueUsageMetrics = new RpcHandlerQueueUsageMetrics();
}
if (userUsageMetrics == null) {
this.userUsageMetrics = new RpcHandlerUserUsageMetrics();
}
return this;
}
private double getIdlePctOfHandlerPool(String handlerPool) {
EWMA idleTimeAverage = idleRpcTimeByPool.get(handlerPool);
if (idleTimeAverage == null) {
return 100d;
}
return 100 * (idleRpcTimeByPool.get(handlerPool).rate(TimeUnit.NANOSECONDS) / handlerRegistry.getHandlerCountOfPool(handlerPool));
}
private static class RpcHandlerThreadRegistry extends ThreadLocal<String> {
/**
* Map of QueueTypes -> count of RPC handler threads for that QueueType.
* The COMBINED QueueType represents the sum of all RPC handler threads.
*/
private final Map<String, AtomicInteger> handlerPoolCounts = new HashMap<>();
public int getHandlerCountOfPool(String handlerPool) {
return Math.max(handlerPoolCounts.get(handlerPool).get(), 1); // minimum 1 to prevent potential division by 0
}
@Override
public void set(String value) {
if (value == null) {
remove();
} else if (get() == null) { // no-op if this thread has already registered
handlerPoolCounts.get(value).incrementAndGet();
super.set(value);
}
}
@Override
public void remove() {
String current = get();
if (current != null) {
handlerPoolCounts.get(current).decrementAndGet();
super.remove();
}
}
private void registerPool(String handlerPool) {
if (!handlerPoolCounts.containsKey(handlerPool)) {
handlerPoolCounts.put(handlerPool, new AtomicInteger());
}
}
}
private class EWMATickTask extends TimerTask {
@Override
public void run() {
for (Map<String, EWMA> userEntries : userRpcUsageByPool.values()) {
for (EWMA movingAverage : userEntries.values()) {
movingAverage.tick();
}
}
for (EWMA movingAverage : idleRpcTimeByPool.values()) {
movingAverage.tick();
}
}
}
private class RpcHandlerQueueUsageMetrics extends BaseSourceImpl {
private static final String METRIC_NAME = "RpcHandlerPoolUsage";
RpcHandlerQueueUsageMetrics() {
super(METRIC_NAME, DESCRIPTION, METRIC_CONTEXT, "RegionServer,sub="+METRIC_NAME);
}
@Override
public void getMetrics(MetricsCollector metricsCollector, boolean all) {
for (String handlerPool : idleRpcTimeByPool.keySet()) {
setGauge(
handlerPool.toLowerCase().replace('.', '-') + "_handlers_usage_pct",
Math.round(100 - getIdlePctOfHandlerPool(handlerPool)));
}
super.getMetrics(metricsCollector, all);
}
}
private class RpcHandlerUserUsageMetrics extends BaseSourceImpl {
private static final String METRIC_NAME = "RpcHandlerUserBreakdown";
private static final double ZERO_THRESHOLD = 0.00001d; // arbitrary value-too-low-to-care-about threshold for usage/metrics
private final Set<String> currentZeroMetrics = new HashSet<>();
RpcHandlerUserUsageMetrics() {
super(METRIC_NAME, DESCRIPTION, METRIC_CONTEXT, "RegionServer,sub="+METRIC_NAME);
}
/**
* @return true if metrics exists and was set successfully; false if metric was deleted due to consecutive zero values.
*/
public synchronized boolean setSelfCleaningGauge(String gaugeName, double value) {
if (value <= ZERO_THRESHOLD) {
if (currentZeroMetrics.contains(gaugeName)) {
// since this metric was already at zero, delete it instead of reporting another 0
removeMetric(gaugeName);
return false;
} else {
currentZeroMetrics.add(gaugeName);
}
} else {
currentZeroMetrics.remove(gaugeName);
}
setGauge(gaugeName, Math.round(value));
return true;
}
@Override
public void getMetrics(MetricsCollector metricsCollector, boolean all) {
for (Map.Entry<String, Map<String, EWMA>> rpcPoolEntry : userRpcUsageByPool.entrySet()) {
String handlerPool = rpcPoolEntry.getKey();
// snapshot { user : usage rate } mappings
Map<String, Double> userRpcUsageRates = new HashMap<>(rpcPoolEntry.getValue().size());
for (Map.Entry<String, EWMA> userAndUsage : rpcPoolEntry.getValue().entrySet()) {
userRpcUsageRates.put(userAndUsage.getKey(), userAndUsage.getValue().rate(TimeUnit.NANOSECONDS));
}
double totalUsage = 0d;
for (Double usageRate : userRpcUsageRates.values()) {
totalUsage += usageRate;
}
for (Map.Entry<String, Double> userAndUsageRate : userRpcUsageRates.entrySet()) {
String username = userAndUsageRate.getKey();
double usagePct = totalUsage > ZERO_THRESHOLD ? 100 * (userAndUsageRate.getValue() / totalUsage) : 0;
boolean metricExists = setSelfCleaningGauge(
username + "_usage_pct_of_" + handlerPool.toLowerCase().replace('.', '-') + "_handlers",
usagePct);
if (!metricExists) {
// clean up users with consecutive usage rates of zero
rpcPoolEntry.getValue().remove(username);
}
}
}
super.getMetrics(metricsCollector, all);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment