Created
December 7, 2018 22:24
-
-
Save jkuipers/2d3b405acb1c7595e0d54166ba356994 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.netflix.hystrix.HystrixThreadPoolKey; | |
import com.netflix.hystrix.HystrixThreadPoolProperties; | |
import com.netflix.hystrix.strategy.HystrixPlugins; | |
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy; | |
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariable; | |
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableLifecycle; | |
import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier; | |
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook; | |
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher; | |
import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy; | |
import com.netflix.hystrix.strategy.properties.HystrixProperty; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.slf4j.MDC; | |
import java.util.Map; | |
import java.util.concurrent.BlockingQueue; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.ThreadPoolExecutor; | |
import java.util.concurrent.TimeUnit; | |
/** | |
* Ensures Hystrix commands executed on a worker thread have access to a copy of the | |
* {@link MDC} from the thread scheduling the command. | |
* <p /> | |
* Code is based on {@link org.springframework.cloud.sleuth.instrument.hystrix.SleuthHystrixConcurrencyStrategy}. | |
*/ | |
public class MdcPropagatingHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy { | |
private Logger logger = LoggerFactory.getLogger(getClass()); | |
private HystrixConcurrencyStrategy delegate; | |
public MdcPropagatingHystrixConcurrencyStrategy() { | |
try { | |
HystrixPlugins plugins = HystrixPlugins.getInstance(); | |
this.delegate = plugins.getConcurrencyStrategy(); | |
if (this.delegate instanceof MdcPropagatingHystrixConcurrencyStrategy) { | |
return; | |
} | |
HystrixCommandExecutionHook commandExecutionHook = plugins.getCommandExecutionHook(); | |
HystrixEventNotifier eventNotifier = plugins.getEventNotifier(); | |
HystrixMetricsPublisher metricsPublisher = plugins.getMetricsPublisher(); | |
HystrixPropertiesStrategy propertiesStrategy = plugins.getPropertiesStrategy(); | |
HystrixPlugins.reset(); | |
plugins.registerConcurrencyStrategy(this); | |
plugins.registerCommandExecutionHook(commandExecutionHook); | |
plugins.registerEventNotifier(eventNotifier); | |
plugins.registerMetricsPublisher(metricsPublisher); | |
plugins.registerPropertiesStrategy(propertiesStrategy); | |
} catch (Exception e) { | |
logger.error("Failed to register MdcPropagatingHystrixConcurrencyStrategy", e); | |
} | |
} | |
@Override | |
public <T> Callable<T> wrapCallable(Callable<T> callable) { | |
if (callable instanceof MdcPropagatingCallable) { | |
return callable; | |
} | |
Callable<T> wrappedCallable = this.delegate != null ? this.delegate.wrapCallable(callable) : callable; | |
if (wrappedCallable instanceof MdcPropagatingCallable) { | |
return wrappedCallable; | |
} | |
return new MdcPropagatingCallable(callable); | |
} | |
@Override | |
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, | |
HystrixProperty<Integer> corePoolSize, | |
HystrixProperty<Integer> maximumPoolSize, | |
HystrixProperty<Integer> keepAliveTime, TimeUnit unit, | |
BlockingQueue<Runnable> workQueue) { | |
return this.delegate.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); | |
} | |
@Override | |
public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, | |
HystrixThreadPoolProperties threadPoolProperties) { | |
return this.delegate.getThreadPool(threadPoolKey, threadPoolProperties); | |
} | |
@Override | |
public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) { | |
return this.delegate.getBlockingQueue(maxQueueSize); | |
} | |
@Override | |
public <T> HystrixRequestVariable<T> getRequestVariable(HystrixRequestVariableLifecycle<T> rv) { | |
return this.delegate.getRequestVariable(rv); | |
} | |
static class MdcPropagatingCallable implements Callable { | |
private final Map<String, String> mdcContext; | |
private final Callable delegate; | |
public MdcPropagatingCallable(Callable delegate) { | |
this.delegate = delegate; | |
this.mdcContext = MDC.getCopyOfContextMap(); | |
} | |
@Override | |
public Object call() throws Exception { | |
if (mdcContext != null) { | |
MDC.setContextMap(mdcContext); | |
} | |
try { | |
return delegate.call(); | |
} finally { | |
// no need to restore original context: it should be empty for a worker thread | |
MDC.clear(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment