Skip to content

Instantly share code, notes, and snippets.

@bszeti
Created June 27, 2018 23:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bszeti/552bf7a1f05a75fc2964a4ce7ce889b5 to your computer and use it in GitHub Desktop.
Save bszeti/552bf7a1f05a75fc2964a4ce7ce889b5 to your computer and use it in GitHub Desktop.
CachePolicy
package com.mycompany.utils;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Processor;
import org.apache.camel.model.ProcessorDefinition;
import org.apache.camel.processor.DelayProcessorSupport;
import org.apache.camel.processor.DelegateAsyncProcessor;
import org.apache.camel.spi.Policy;
import org.apache.camel.spi.RouteContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.cache.Cache;
import java.util.function.BiConsumer;
/**
* Policy for routes. It caches the "result" and calls the remaining processors in the route only if no cached result is found.
* If a cached object is found by the keyExpression the rest of the route is not executed,
* but the cached object if added to the Exchange by calling the applyCachedObject function.
*
* Fields:
* cache: JCache to use
* keyExpression: The Expression to generate the key for the cache. E.g simple("${header.username}")
* valueExpression: The Expression to generate the value stored in the cache for the key. E.g exchangeProperty("orders")
* applyCachedObject: A function how to add the cached object to the Exchange if found. Practically this is executed instead of the route.
*/
public class CachePolicy implements Policy {
private static final Logger log = LoggerFactory.getLogger(CachePolicy.class);
private Cache cache;
private Expression keyExpression;
private Expression valueExpression;
private BiConsumer<Exchange, Object> applyCachedObject;
@Override
public void beforeWrap(RouteContext routeContext, ProcessorDefinition<?> definition) {
}
@Override
public Processor wrap(RouteContext routeContext, Processor processor) {
return new DelegateAsyncProcessor(processor){
@Override
public boolean process(final Exchange exchange, final AsyncCallback callback) {
//If no cache enabled, just continue
if (cache == null)
return super.process(exchange,callback);
log.debug("CachePolicy process started - route:{} exchange:{}", routeContext.getRoute().getId(), exchange.getExchangeId());
try {
//Get key by the expression
Object key = keyExpression.evaluate(exchange, Object.class);
//Check if cache contains the key
if (key != null) {
Object cached = cache.get(key);
if (cached != null) {
// use the cached object in the Exchange without calling the rest of the route
log.debug("Cached object is found, skipping the rest of route:{} key:{}", routeContext.getRoute().getId(), key);
applyCachedObject.accept(exchange, cached);
callback.done(true);
return true;
}
}
//Not found in cache. Call the rest of the route
log.debug("No cached object is found, continue route:{} key:{}", routeContext.getRoute().getId(), key);
boolean answer = super.process(exchange, new AsyncCallback() {
@Override
public void done(boolean doneSync) {
try {
//Try to cache the value only if not failed
if (!exchange.isFailed()) {
//Save value in cache
Object value = valueExpression.evaluate(exchange, Object.class);
if (value != null) {
log.debug("Saving in cache. route:{} key:{}", routeContext.getRoute().getId(), key);
cache.put(key, value);
}
}
} catch (Exception ex){
//Something went wrong with caching the value
exchange.setException(ex);
log.error("Error while caching value: {}",valueExpression);
} finally {
callback.done(doneSync);
}
}
});
log.debug("CachePolicy process end - route:{} exchange:{}", routeContext.getRoute().getId(), exchange.getExchangeId());
return answer;
} catch (Exception e) {
exchange.setException(e);
callback.done(true);
return true;
}
}
};
}
public Cache getCache() {
return cache;
}
public void setCache(Cache cache) {
this.cache = cache;
}
public Expression getKeyExpression() {
return keyExpression;
}
public void setKeyExpression(Expression keyExpression) {
this.keyExpression = keyExpression;
}
public Expression getValueExpression() {
return valueExpression;
}
public void setValueExpression(Expression valueExpression) {
this.valueExpression = valueExpression;
}
public BiConsumer<Exchange, Object> getApplyCachedObject() {
return applyCachedObject;
}
public void setApplyCachedObject(BiConsumer<Exchange, Object> applyCachedObject) {
this.applyCachedObject = applyCachedObject;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment