Last active
August 29, 2015 13:56
-
-
Save xmlking/9199117 to your computer and use it in GitHub Desktop.
Resilience Framework
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import grails.transaction.Transactional | |
@Transactional(readOnly = true) | |
class BarService implements Greet{ | |
static scope = "prototype" | |
@Override | |
public String greet() throws IOException{ | |
if (new Random().nextInt(2) == 1) throw new IOException("dummy exception from BarService in...greet"); | |
return "Hello from Bar in greet"; | |
} | |
@Override | |
public String salute() throws FileNotFoundException{ | |
if (new Random().nextInt(2) == 1) throw new FileNotFoundException("dummy exception from BarService in...salute"); | |
return "Hello from Bar in salute"; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Governor(type = Governor.GovernorType.RATE, limit = 2, period = 45L, unit = TimeUnit.SECONDS) | |
@Transactional(readOnly = true) | |
class CarService { | |
public String serviceMethod(String arg1) { | |
return "from CarService ${arg1}" | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class DemoController { | |
def demoService | |
def fooService | |
def jarService | |
def tarService | |
def carService | |
def index() { | |
def actions = new HashSet<String>() | |
def actionName | |
def controllerClass = grailsApplication.getArtefactInfo(ControllerArtefactHandler.TYPE).getGrailsClassByLogicalPropertyName(controllerName) | |
controllerClass.getURIs().each { uri -> | |
actionName = controllerClass.getMethodActionName(uri) | |
if (actionName != "index") actions.add(actionName) | |
} | |
[actions:actions] | |
} | |
@MessageMapping("/hello") | |
@SendTo("/topic/log") | |
protected String hello() { | |
return "hello from controller!" | |
} | |
def testAsyncWithPromise(@RequestParameter('a') String arg1) { | |
def symbols = ['AAPL', 'GOOG', 'IBM', 'MSFT'] | |
def promiseList = new PromiseList() | |
symbols.each { stock -> | |
promiseList << task { | |
def url = new URL("http://download.finance.yahoo.com/d/quotes.csv?s=${stock}&f=nsl1op&e=.csv") | |
//log.debug("I will be running in a thread pool") | |
Double price = url.text.split(',')[-1] as Double | |
} | |
} | |
def prices = promiseList.get(2,TimeUnit.SECONDS) | |
render ([symbols, prices].transpose().inject([:]) { a, b -> a[b[0]] = b[1]; a }) | |
} | |
def testAsyncFlowWithPromise(@RequestParameter('a') String arg1) { | |
render demoService.asyncFlowWithPromise(arg1) | |
} | |
def testAsyncFlowWithPromisesUsingControlledThreadPools(@RequestParameter('a') String arg1) { | |
render demoService.asyncFlowWithPromisesUsingControlledThreadPools(arg1) | |
} | |
/** Calls to get(...) result in a java.util.concurrent.CancellationException being thrown. | |
* If the invocation resulted in an exception during processing by the service bean, | |
* calls to get(...) result in a java.util.concurrent.ExecutionException being thrown. | |
* The cause of the ExecutionException may be retrieved by calling the ExecutionException.getCause method. | |
* If the timeout value is exceeded, a java.util.concurrent.TimeoutException is thrown. | |
**/ | |
def testAsyncWithSumoThreadPool(@RequestParameter('a') String arg1) { | |
Future<String> futureResult = demoService.asyncWithSumoThreadPool(arg1)// client makes this an async call | |
String result | |
try { | |
result = futureResult.get(7, TimeUnit.SECONDS) | |
}catch (TimeoutException te) { | |
// handle the timeout | |
log.debug "Timeout....Trying to cancel the worker thread..." | |
log.debug "Cancellation was successful? : ${futureResult.cancel(true)}" | |
} catch (ExecutionException ee) { | |
// task completed with error, handle appropriately | |
log.debug "ExecutionException...." | |
throw ee.getCause() | |
} catch (InterruptedException e) { | |
// handle the interrupts caused by caller of this method, while waiting | |
log.debug "InterruptedException...." | |
}catch (CancellationException ce) { | |
// task has been cancelled by others, handle appropriately | |
log.debug "CancellationException...." | |
} finally { | |
// futureResult.cancel(true); // may or may not desire this | |
} | |
log.debug "returning the result...." | |
if(futureResult.isCancelled()){ | |
log.debug "Work cancelled" | |
render "Work cancelled" | |
} | |
else if(futureResult.isDone()){ | |
log.debug "work completed..." | |
render result | |
} else { | |
render "Work still going on....." | |
} | |
} | |
// May throw TaskRejectedException if more then 3 concurrent request hit. | |
def testRetryOnFailureWithTwoThreadPool(@RequestParameter('a') String arg1) { | |
Future<String> futureResult | |
String result | |
try { | |
futureResult = demoService.retryOnFailureWithTwoThreadPool(arg1) | |
result = futureResult.get() //futureResult.get(6, TimeUnit.SECONDS) | |
}catch (TimeoutException te) { | |
// handle the timeout | |
log.debug "Timeout...Trying to cancel the worker thread..." | |
log.debug "Cancellation was successful? : ${futureResult.cancel(true)}" | |
} catch (ExecutionException ee) { | |
// task completed with error, handle appropriately | |
log.debug "ExecutionException....",ee | |
throw ee.getCause() | |
} catch (TaskRejectedException tre) { | |
// task rejected due to overload, handle appropriately | |
// This should not happen as service will retry when it receive this exception. | |
log.debug "TaskRejectedException...." | |
throw tre | |
} catch(InterruptedException | CancellationException e ) { | |
// handle the interrupts the cancellations caused by caller of this method, while waiting | |
log.debug "Interrupted or Cancelled ....${e.message}\n\t\tTrying to cancel the worker thread..." | |
log.debug "\t\tCancellation was successful? : ${futureResult.cancel(true)}" | |
} finally { | |
// futureResult.cancel(true); // may or may not desire this | |
} | |
log.debug "returning the result...." | |
if(futureResult.isCancelled()){ | |
log.debug "Work cancelled" | |
render "Work cancelled" | |
} | |
else if(futureResult.isDone()){ | |
log.debug "work completed..." | |
render result | |
} else { | |
render "Work still going on....." | |
} | |
} | |
def testTimeout(@RequestParameter('a') String arg1) { | |
try { | |
render demoService.timeout(arg1) | |
}catch (InterruptedException ie) { | |
// handle the interrupts | |
log.debug "InterruptedException...." | |
render "request timeout...." | |
} | |
} | |
def testTimeout2(@RequestParameter('a') String arg1) { | |
try { | |
render demoService.timeout2(arg1) | |
}catch (TimeoutException te) { | |
// handle the timeout | |
log.debug "TimeoutException...." | |
render "request timeout...." | |
} | |
} | |
def testGovernorWithConcurrency(@RequestParameter('a') String arg1) { | |
try{ | |
render demoService.throttleWithConcurrency(arg1) | |
} catch(ConcurrencyLimitExceededException e) { | |
log.warn("ConcurrencyLimitExceededException : ${e.message}") | |
render "Please try again later :${e.message} " | |
} | |
} | |
def testGovernorWithConcurrencyAndBlocking(@RequestParameter('a') String arg1) { | |
render demoService.throttleWithConcurrencyAndBlocking(arg1) | |
} | |
def testGovernorWithRateLimit(@RequestParameter('a') String arg1) { | |
try{ | |
render demoService.throttleWithRateLimit(arg1) | |
}catch(RateLimitExceededException e) { | |
log.warn("RateLimitExceededException : ${e.message}") | |
render "Please try again later :${e.message} " | |
} | |
} | |
def testGovernorWithRateLimitAndBlocking(@RequestParameter('a') String arg1) { | |
render demoService.throttleWithRateLimitAndBlocking(arg1) | |
} | |
def testGovernorWithRateLimitOnClass(@RequestParameter('a') String arg1) { | |
try{ | |
render carService.serviceMethod(arg1) | |
}catch(RateLimitExceededException e) { | |
log.warn("RateLimitExceededException : ${e.message}") | |
render "Please try again later :${e.message} " | |
} | |
} | |
def testRetryOnFailure(@RequestParameter('a') String arg1) { | |
try{ | |
render demoService.retryOnFailure(arg1) | |
}catch(RateLimitExceededException rlee) { | |
render "You are trying too hard. I can't Fallback any more.<br/>Error : ${rlee}" | |
}catch(Throwable tro) { | |
render "You are trying too hard. I can't Retry any more.<br/>Error : ${tro}"//${tro.undeclaredThrowable}" | |
} | |
} | |
def testFallback(@RequestParameter('a') String arg1) { | |
try { | |
render fooService.greet() + fooService.salute() | |
} catch(Throwable tro) { | |
render "You are trying too hard. I can't Fallback any more.<br/>Error : ${tro}" | |
} | |
} | |
def testCircuitBreaker(@RequestParameter('a') String arg) { | |
try { | |
render "Ok ...${demoService.circuitBreaker(arg)}" | |
}catch(OpenCircuitException oce) { | |
render "Error : Circuit Open. Please wait for 80 seconds" | |
} catch(Throwable tro) { | |
render "Error : ${tro.undeclaredThrowable}" | |
//render "Error : ${tro}" | |
} | |
} | |
def testCircuitBreakerWithFallback(@RequestParameter('a') String arg1) { | |
try { | |
render demoService.greet() | |
}catch(Throwable tro) { | |
render "Error : ${tro}" | |
} | |
} | |
def handleTaskRejectedException(TaskRejectedException e) { | |
render 'Got TaskRejectedException...' | |
} | |
def handleTimeoutException(TimeoutException e) { | |
render 'Got TimeoutException...' | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Transactional(readOnly = true) | |
class DemoService { | |
def grailsApplication | |
//def group1 = new DefaultPGroup(new ResizeablePool(true)) | |
//def group2 = new DefaultPGroup(2) | |
def SumoThreadPoolGroup, TwoThreadPoolGroup | |
@PostConstruct | |
def init() { | |
SumoThreadPoolGroup = new DefaultPGroup(new DefaultPool(grailsApplication.mainContext.SumoThreadPool.threadPoolExecutor)) | |
TwoThreadPoolGroup = new DefaultPGroup(new DefaultPool(grailsApplication.mainContext.TwoThreadPool.threadPoolExecutor)) | |
} | |
String asyncFlowWithPromise(String arg1){ | |
List results = new ArrayList(); | |
//http://www.jsontest.com/ | |
Promise p1 = task { "http://echo.jsontest.com/YouSaid/${arg1}".toURL().text } | |
Promise p2 = task { "http://md5.jsontest.com/?text=${arg1}".toURL().text } | |
Promise p3 = task { "http://ip.jsontest.com/".toURL().text } | |
Promise p4 = task { "http://time.jsontest.com/".toURL().text } | |
Promise p5 = task { "http://validate.jsontest.com/?json=${arg1}".toURL().text } | |
onComplete([p3,p4]) { List p3p4 -> | |
results << p3p4 | |
}.then { p3p4 -> | |
println p3p4 | |
p3p4 << waitAll(p1, p2, p5) | |
}.get(3,TimeUnit.SECONDS) | |
} | |
String asyncFlowWithPromisesUsingControlledThreadPools(String arg1) { | |
Promise p1 = new GparsPromise(SumoThreadPoolGroup.task { | |
log.debug('this log should be in sumo pool') | |
"http://echo.jsontest.com/YouSaid/${arg1}".toURL().text | |
}) | |
Promise p2 = new GparsPromise(TwoThreadPoolGroup.task { | |
log.debug('this log should be in two pool') | |
"http://md5.jsontest.com/?text=${arg1}".toURL().text | |
}) | |
waitAll(p1, p2) | |
} | |
private Future<String> await(String arg1, int count) { | |
log.debug "beginning await work for ${count} seconds..." | |
int loop = 0 | |
try{ | |
//do batch jobs incrementally | |
while(!Thread.currentThread().isInterrupted() & loop < count){ //TODO: checking isInterrupted is really useful? | |
//do a sub-task of a long running batch job here | |
TimeUnit.SECONDS.sleep(1) // fake working for one second | |
loop++ | |
log.debug "loop count ... ${loop}" | |
} | |
}catch(InterruptedException ie){ | |
log.debug "got Interrupted!... loop count ... ${loop}, Cleaning the resource..." | |
//do cleanup job... close connections etc... | |
Thread.currentThread().interrupt() // TADA: this will keep not swallowing interrupt flag | |
} | |
log.debug "loop count Out ... ${loop}" | |
return new AsyncResult<String>("REPLY: ${arg1} looped: ${loop} times") | |
} | |
@Async('SumoThreadPool') | |
Future<String> asyncWithSumoThreadPool(String arg1) { | |
return await(arg1, 6); | |
} | |
@Retry(attempts = 2, delay =8L, unit = TimeUnit.SECONDS, exceptions = [TaskRejectedException.class]) | |
// TwoThreadPool Executor uses default rejection-policy i.e., AbortPolicy | |
// Throws TaskRejectedException if more then 3 concurrent request hit. | |
@Async("TwoThreadPool") | |
Future<String> retryOnFailureWithTwoThreadPool(String arg1) throws TaskRejectedException{ | |
return await(arg1 , 8); | |
} | |
@Timeout(value = 3l, unit = TimeUnit.SECONDS) | |
String timeout(String arg1) throws InterruptedException{ | |
def time = new Random().nextInt(3)+2 // time will be either 2 , 3 or 4 | |
log.debug "waiting in testTimeout1 ${time}..." | |
TimeUnit.SECONDS.sleep(time) | |
return arg1 | |
} | |
@Timeout2(value = 3010l, unit = TimeUnit.MILLISECONDS) | |
String timeout2(String arg1) throws TimeoutException{ | |
def time = new Random().nextInt(3)+2 // time will be either 2 , 3 or 4 | |
log.debug "waiting in testTimeout2 ${time}..." | |
TimeUnit.SECONDS.sleep(time) | |
return arg1 | |
} | |
//implicit type = ThrottleType.CONCURRENCY | |
@Governor(limit = 1) | |
String throttleWithConcurrency(String arg1) { | |
TimeUnit.SECONDS.sleep(5) | |
return arg1 | |
} | |
@Governor(limit = 1, blocking=true) | |
String throttleWithConcurrencyAndBlocking(String arg1) { | |
TimeUnit.SECONDS.sleep(5) | |
return arg1 | |
} | |
@Governor(type = GovernorType.RATE, limit = 2, period = 45L, unit = TimeUnit.SECONDS) | |
String throttleWithRateLimit(String arg1) { | |
TimeUnit.SECONDS.sleep(5) | |
return arg1 | |
} | |
@Governor(type = GovernorType.RATE, limit = 3, period = 60L, blocking=true, unit = TimeUnit.SECONDS) | |
String throttleWithRateLimitAndBlocking(String arg1) { | |
TimeUnit.SECONDS.sleep(5) | |
return arg1 | |
} | |
// Apply @Retry only to Idempotent Operations | |
@Retry(attempts = 2, delay =8L, unit = TimeUnit.SECONDS, exceptions = [IOException.class,RateLimitExceededException.class]) | |
String retryOnFailure(String arg1) { | |
if (new Random().nextInt(5) == 4) throw new IOException() | |
// need to call this way to go through CGLIB Proxy. | |
return grailsApplication.mainContext.demoService.throttleWithRateLimit(arg1) | |
} | |
@CircuitBreaker(failureThreshold=2,failureThresholdTimeFrameMs=60000l,retryAfterMs=80000l) | |
String circuitBreaker(String arg1) { | |
TimeUnit.SECONDS.sleep(2) | |
if (new Random().nextInt(3) == 2) throw new FileNotFoundException("fake FileNotFoundException") | |
return "Hello ${arg1} in testCircuitBreaker" | |
} | |
@Fallback(value = ['barService','jarService'], exceptions = [OpenCircuitException.class,FileNotFoundException.class]) | |
@CircuitBreaker(failureThreshold=2,failureThresholdTimeFrameMs=60000l,retryAfterMs=80000l,failureIndications=[FileNotFoundException.class]) | |
String greet() { | |
TimeUnit.SECONDS.sleep(2) | |
throw new FileNotFoundException() | |
return "greet: Hello! in DemoService.greet()" | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Transactional(readOnly = true) | |
@Fallback(['barService','jarService']) | |
class FooService implements Greet{ | |
static scope = "prototype" | |
@Override | |
public String greet() throws IOException{ | |
if (new Random().nextInt(5) == 4) throw new IOException("dummy exception from FooService in...greet"); | |
return "Hello from Foo in greet "; | |
} | |
@Override | |
public String salute() throws FileNotFoundException{ | |
if (new Random().nextInt(5) == 4) throw new FileNotFoundException("dummy exception from FooService in...salute"); | |
return "Hello from Foo in salute "; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface Greet { | |
public String greet() throws IOException; | |
public String salute() throws FileNotFoundException; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import grails.transaction.Transactional | |
@Transactional(readOnly = true) | |
class JarService implements Greet{ | |
static scope = "prototype" | |
@Override | |
public String greet() throws IOException{ | |
if (new Random().nextInt(2) == 1) throw new NullPointerException("dummy exception from JarService in...greet"); | |
return "Hello from Jar in greet "; | |
} | |
@Override | |
public String salute() throws FileNotFoundException{ | |
if (new Random().nextInt(2) == 1) throw new NullPointerException("dummy exception from JarService in...salute"); | |
return "Hello from Jar in salute "; | |
} | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Place your Spring DSL code here | |
beans = { | |
//importBeans 'file:grails-app/conf/spring/resiliency.groovy' | |
xmlns task:"http://www.springframework.org/schema/task" | |
// Executor Pools | |
task.executor(id:'defaultExecutor', 'pool-size':'5-10', 'queue-capacity':'25', 'rejection-policy':'CALLER_RUNS') | |
task.executor(id:'SumoThreadPool', 'pool-size':'5-10', 'queue-capacity':'25', 'rejection-policy':'CALLER_RUNS') | |
task.executor(id:'TwoThreadPool', 'pool-size':'1-2', 'queue-capacity':'1') | |
tarService(com.crossbusiness.resiliency.demo.TarService) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment