Created
June 27, 2018 23:25
-
-
Save bszeti/552bf7a1f05a75fc2964a4ce7ce889b5 to your computer and use it in GitHub Desktop.
CachePolicy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mycompany.utils; | |
import org.apache.camel.AsyncCallback; | |
import org.apache.camel.Exchange; | |
import org.apache.camel.Expression; | |
import org.apache.camel.Processor; | |
import org.apache.camel.model.ProcessorDefinition; | |
import org.apache.camel.processor.DelayProcessorSupport; | |
import org.apache.camel.processor.DelegateAsyncProcessor; | |
import org.apache.camel.spi.Policy; | |
import org.apache.camel.spi.RouteContext; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import javax.cache.Cache; | |
import java.util.function.BiConsumer; | |
/** | |
* Policy for routes. It caches the "result" and calls the remaining processors in the route only if no cached result is found. | |
* If a cached object is found by the keyExpression the rest of the route is not executed, | |
* but the cached object if added to the Exchange by calling the applyCachedObject function. | |
* | |
* Fields: | |
* cache: JCache to use | |
* keyExpression: The Expression to generate the key for the cache. E.g simple("${header.username}") | |
* valueExpression: The Expression to generate the value stored in the cache for the key. E.g exchangeProperty("orders") | |
* applyCachedObject: A function how to add the cached object to the Exchange if found. Practically this is executed instead of the route. | |
*/ | |
public class CachePolicy implements Policy { | |
private static final Logger log = LoggerFactory.getLogger(CachePolicy.class); | |
private Cache cache; | |
private Expression keyExpression; | |
private Expression valueExpression; | |
private BiConsumer<Exchange, Object> applyCachedObject; | |
@Override | |
public void beforeWrap(RouteContext routeContext, ProcessorDefinition<?> definition) { | |
} | |
@Override | |
public Processor wrap(RouteContext routeContext, Processor processor) { | |
return new DelegateAsyncProcessor(processor){ | |
@Override | |
public boolean process(final Exchange exchange, final AsyncCallback callback) { | |
//If no cache enabled, just continue | |
if (cache == null) | |
return super.process(exchange,callback); | |
log.debug("CachePolicy process started - route:{} exchange:{}", routeContext.getRoute().getId(), exchange.getExchangeId()); | |
try { | |
//Get key by the expression | |
Object key = keyExpression.evaluate(exchange, Object.class); | |
//Check if cache contains the key | |
if (key != null) { | |
Object cached = cache.get(key); | |
if (cached != null) { | |
// use the cached object in the Exchange without calling the rest of the route | |
log.debug("Cached object is found, skipping the rest of route:{} key:{}", routeContext.getRoute().getId(), key); | |
applyCachedObject.accept(exchange, cached); | |
callback.done(true); | |
return true; | |
} | |
} | |
//Not found in cache. Call the rest of the route | |
log.debug("No cached object is found, continue route:{} key:{}", routeContext.getRoute().getId(), key); | |
boolean answer = super.process(exchange, new AsyncCallback() { | |
@Override | |
public void done(boolean doneSync) { | |
try { | |
//Try to cache the value only if not failed | |
if (!exchange.isFailed()) { | |
//Save value in cache | |
Object value = valueExpression.evaluate(exchange, Object.class); | |
if (value != null) { | |
log.debug("Saving in cache. route:{} key:{}", routeContext.getRoute().getId(), key); | |
cache.put(key, value); | |
} | |
} | |
} catch (Exception ex){ | |
//Something went wrong with caching the value | |
exchange.setException(ex); | |
log.error("Error while caching value: {}",valueExpression); | |
} finally { | |
callback.done(doneSync); | |
} | |
} | |
}); | |
log.debug("CachePolicy process end - route:{} exchange:{}", routeContext.getRoute().getId(), exchange.getExchangeId()); | |
return answer; | |
} catch (Exception e) { | |
exchange.setException(e); | |
callback.done(true); | |
return true; | |
} | |
} | |
}; | |
} | |
public Cache getCache() { | |
return cache; | |
} | |
public void setCache(Cache cache) { | |
this.cache = cache; | |
} | |
public Expression getKeyExpression() { | |
return keyExpression; | |
} | |
public void setKeyExpression(Expression keyExpression) { | |
this.keyExpression = keyExpression; | |
} | |
public Expression getValueExpression() { | |
return valueExpression; | |
} | |
public void setValueExpression(Expression valueExpression) { | |
this.valueExpression = valueExpression; | |
} | |
public BiConsumer<Exchange, Object> getApplyCachedObject() { | |
return applyCachedObject; | |
} | |
public void setApplyCachedObject(BiConsumer<Exchange, Object> applyCachedObject) { | |
this.applyCachedObject = applyCachedObject; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment