Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jkuipers/2d3b405acb1c7595e0d54166ba356994 to your computer and use it in GitHub Desktop.
Save jkuipers/2d3b405acb1c7595e0d54166ba356994 to your computer and use it in GitHub Desktop.
import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariable;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableLifecycle;
import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier;
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy;
import com.netflix.hystrix.strategy.properties.HystrixProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Ensures Hystrix commands executed on a worker thread have access to a copy of the
* {@link MDC} from the thread scheduling the command.
* <p />
* Code is based on {@link org.springframework.cloud.sleuth.instrument.hystrix.SleuthHystrixConcurrencyStrategy}.
*/
public class MdcPropagatingHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
private Logger logger = LoggerFactory.getLogger(getClass());
private HystrixConcurrencyStrategy delegate;
public MdcPropagatingHystrixConcurrencyStrategy() {
try {
HystrixPlugins plugins = HystrixPlugins.getInstance();
this.delegate = plugins.getConcurrencyStrategy();
if (this.delegate instanceof MdcPropagatingHystrixConcurrencyStrategy) {
return;
}
HystrixCommandExecutionHook commandExecutionHook = plugins.getCommandExecutionHook();
HystrixEventNotifier eventNotifier = plugins.getEventNotifier();
HystrixMetricsPublisher metricsPublisher = plugins.getMetricsPublisher();
HystrixPropertiesStrategy propertiesStrategy = plugins.getPropertiesStrategy();
HystrixPlugins.reset();
plugins.registerConcurrencyStrategy(this);
plugins.registerCommandExecutionHook(commandExecutionHook);
plugins.registerEventNotifier(eventNotifier);
plugins.registerMetricsPublisher(metricsPublisher);
plugins.registerPropertiesStrategy(propertiesStrategy);
} catch (Exception e) {
logger.error("Failed to register MdcPropagatingHystrixConcurrencyStrategy", e);
}
}
@Override
public <T> Callable<T> wrapCallable(Callable<T> callable) {
if (callable instanceof MdcPropagatingCallable) {
return callable;
}
Callable<T> wrappedCallable = this.delegate != null ? this.delegate.wrapCallable(callable) : callable;
if (wrappedCallable instanceof MdcPropagatingCallable) {
return wrappedCallable;
}
return new MdcPropagatingCallable(callable);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixProperty<Integer> corePoolSize,
HystrixProperty<Integer> maximumPoolSize,
HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
HystrixThreadPoolProperties threadPoolProperties) {
return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties);
}
@Override
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
return this.delegate.getBlockingQueue(maxQueueSize);
}
@Override
public <T> HystrixRequestVariable<T> getRequestVariable(HystrixRequestVariableLifecycle<T> rv) {
return this.delegate.getRequestVariable(rv);
}
static class MdcPropagatingCallable implements Callable {
private final Map<String, String> mdcContext;
private final Callable delegate;
public MdcPropagatingCallable(Callable delegate) {
this.delegate = delegate;
this.mdcContext = MDC.getCopyOfContextMap();
}
@Override
public Object call() throws Exception {
if (mdcContext != null) {
MDC.setContextMap(mdcContext);
}
try {
return delegate.call();
} finally {
// no need to restore original context: it should be empty for a worker thread
MDC.clear();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment