Skip to content

Instantly share code, notes, and snippets.

@xmlking
Last active August 29, 2015 13:56
Show Gist options
  • Save xmlking/9199117 to your computer and use it in GitHub Desktop.
Save xmlking/9199117 to your computer and use it in GitHub Desktop.
Resilience Framework
import grails.transaction.Transactional
@Transactional(readOnly = true)
class BarService implements Greet{
static scope = "prototype"
@Override
public String greet() throws IOException{
if (new Random().nextInt(2) == 1) throw new IOException("dummy exception from BarService in...greet");
return "Hello from Bar in greet";
}
@Override
public String salute() throws FileNotFoundException{
if (new Random().nextInt(2) == 1) throw new FileNotFoundException("dummy exception from BarService in...salute");
return "Hello from Bar in salute";
}
}
@Governor(type = Governor.GovernorType.RATE, limit = 2, period = 45L, unit = TimeUnit.SECONDS)
@Transactional(readOnly = true)
class CarService {
public String serviceMethod(String arg1) {
return "from CarService ${arg1}"
}
}
class DemoController {
def demoService
def fooService
def jarService
def tarService
def carService
def index() {
def actions = new HashSet<String>()
def actionName
def controllerClass = grailsApplication.getArtefactInfo(ControllerArtefactHandler.TYPE).getGrailsClassByLogicalPropertyName(controllerName)
controllerClass.getURIs().each { uri ->
actionName = controllerClass.getMethodActionName(uri)
if (actionName != "index") actions.add(actionName)
}
[actions:actions]
}
@MessageMapping("/hello")
@SendTo("/topic/log")
protected String hello() {
return "hello from controller!"
}
def testAsyncWithPromise(@RequestParameter('a') String arg1) {
def symbols = ['AAPL', 'GOOG', 'IBM', 'MSFT']
def promiseList = new PromiseList()
symbols.each { stock ->
promiseList << task {
def url = new URL("http://download.finance.yahoo.com/d/quotes.csv?s=${stock}&f=nsl1op&e=.csv")
//log.debug("I will be running in a thread pool")
Double price = url.text.split(',')[-1] as Double
}
}
def prices = promiseList.get(2,TimeUnit.SECONDS)
render ([symbols, prices].transpose().inject([:]) { a, b -> a[b[0]] = b[1]; a })
}
def testAsyncFlowWithPromise(@RequestParameter('a') String arg1) {
render demoService.asyncFlowWithPromise(arg1)
}
def testAsyncFlowWithPromisesUsingControlledThreadPools(@RequestParameter('a') String arg1) {
render demoService.asyncFlowWithPromisesUsingControlledThreadPools(arg1)
}
/** Calls to get(...) result in a java.util.concurrent.CancellationException being thrown.
* If the invocation resulted in an exception during processing by the service bean,
* calls to get(...) result in a java.util.concurrent.ExecutionException being thrown.
* The cause of the ExecutionException may be retrieved by calling the ExecutionException.getCause method.
* If the timeout value is exceeded, a java.util.concurrent.TimeoutException is thrown.
**/
def testAsyncWithSumoThreadPool(@RequestParameter('a') String arg1) {
Future<String> futureResult = demoService.asyncWithSumoThreadPool(arg1)// client makes this an async call
String result
try {
result = futureResult.get(7, TimeUnit.SECONDS)
}catch (TimeoutException te) {
// handle the timeout
log.debug "Timeout....Trying to cancel the worker thread..."
log.debug "Cancellation was successful? : ${futureResult.cancel(true)}"
} catch (ExecutionException ee) {
// task completed with error, handle appropriately
log.debug "ExecutionException...."
throw ee.getCause()
} catch (InterruptedException e) {
// handle the interrupts caused by caller of this method, while waiting
log.debug "InterruptedException...."
}catch (CancellationException ce) {
// task has been cancelled by others, handle appropriately
log.debug "CancellationException...."
} finally {
// futureResult.cancel(true); // may or may not desire this
}
log.debug "returning the result...."
if(futureResult.isCancelled()){
log.debug "Work cancelled"
render "Work cancelled"
}
else if(futureResult.isDone()){
log.debug "work completed..."
render result
} else {
render "Work still going on....."
}
}
// May throw TaskRejectedException if more then 3 concurrent request hit.
def testRetryOnFailureWithTwoThreadPool(@RequestParameter('a') String arg1) {
Future<String> futureResult
String result
try {
futureResult = demoService.retryOnFailureWithTwoThreadPool(arg1)
result = futureResult.get() //futureResult.get(6, TimeUnit.SECONDS)
}catch (TimeoutException te) {
// handle the timeout
log.debug "Timeout...Trying to cancel the worker thread..."
log.debug "Cancellation was successful? : ${futureResult.cancel(true)}"
} catch (ExecutionException ee) {
// task completed with error, handle appropriately
log.debug "ExecutionException....",ee
throw ee.getCause()
} catch (TaskRejectedException tre) {
// task rejected due to overload, handle appropriately
// This should not happen as service will retry when it receive this exception.
log.debug "TaskRejectedException...."
throw tre
} catch(InterruptedException | CancellationException e ) {
// handle the interrupts the cancellations caused by caller of this method, while waiting
log.debug "Interrupted or Cancelled ....${e.message}\n\t\tTrying to cancel the worker thread..."
log.debug "\t\tCancellation was successful? : ${futureResult.cancel(true)}"
} finally {
// futureResult.cancel(true); // may or may not desire this
}
log.debug "returning the result...."
if(futureResult.isCancelled()){
log.debug "Work cancelled"
render "Work cancelled"
}
else if(futureResult.isDone()){
log.debug "work completed..."
render result
} else {
render "Work still going on....."
}
}
def testTimeout(@RequestParameter('a') String arg1) {
try {
render demoService.timeout(arg1)
}catch (InterruptedException ie) {
// handle the interrupts
log.debug "InterruptedException...."
render "request timeout...."
}
}
def testTimeout2(@RequestParameter('a') String arg1) {
try {
render demoService.timeout2(arg1)
}catch (TimeoutException te) {
// handle the timeout
log.debug "TimeoutException...."
render "request timeout...."
}
}
def testGovernorWithConcurrency(@RequestParameter('a') String arg1) {
try{
render demoService.throttleWithConcurrency(arg1)
} catch(ConcurrencyLimitExceededException e) {
log.warn("ConcurrencyLimitExceededException : ${e.message}")
render "Please try again later :${e.message} "
}
}
def testGovernorWithConcurrencyAndBlocking(@RequestParameter('a') String arg1) {
render demoService.throttleWithConcurrencyAndBlocking(arg1)
}
def testGovernorWithRateLimit(@RequestParameter('a') String arg1) {
try{
render demoService.throttleWithRateLimit(arg1)
}catch(RateLimitExceededException e) {
log.warn("RateLimitExceededException : ${e.message}")
render "Please try again later :${e.message} "
}
}
def testGovernorWithRateLimitAndBlocking(@RequestParameter('a') String arg1) {
render demoService.throttleWithRateLimitAndBlocking(arg1)
}
def testGovernorWithRateLimitOnClass(@RequestParameter('a') String arg1) {
try{
render carService.serviceMethod(arg1)
}catch(RateLimitExceededException e) {
log.warn("RateLimitExceededException : ${e.message}")
render "Please try again later :${e.message} "
}
}
def testRetryOnFailure(@RequestParameter('a') String arg1) {
try{
render demoService.retryOnFailure(arg1)
}catch(RateLimitExceededException rlee) {
render "You are trying too hard. I can't Fallback any more.<br/>Error : ${rlee}"
}catch(Throwable tro) {
render "You are trying too hard. I can't Retry any more.<br/>Error : ${tro}"//${tro.undeclaredThrowable}"
}
}
def testFallback(@RequestParameter('a') String arg1) {
try {
render fooService.greet() + fooService.salute()
} catch(Throwable tro) {
render "You are trying too hard. I can't Fallback any more.<br/>Error : ${tro}"
}
}
def testCircuitBreaker(@RequestParameter('a') String arg) {
try {
render "Ok ...${demoService.circuitBreaker(arg)}"
}catch(OpenCircuitException oce) {
render "Error : Circuit Open. Please wait for 80 seconds"
} catch(Throwable tro) {
render "Error : ${tro.undeclaredThrowable}"
//render "Error : ${tro}"
}
}
def testCircuitBreakerWithFallback(@RequestParameter('a') String arg1) {
try {
render demoService.greet()
}catch(Throwable tro) {
render "Error : ${tro}"
}
}
def handleTaskRejectedException(TaskRejectedException e) {
render 'Got TaskRejectedException...'
}
def handleTimeoutException(TimeoutException e) {
render 'Got TimeoutException...'
}
}
@Transactional(readOnly = true)
class DemoService {
def grailsApplication
//def group1 = new DefaultPGroup(new ResizeablePool(true))
//def group2 = new DefaultPGroup(2)
def SumoThreadPoolGroup, TwoThreadPoolGroup
@PostConstruct
def init() {
SumoThreadPoolGroup = new DefaultPGroup(new DefaultPool(grailsApplication.mainContext.SumoThreadPool.threadPoolExecutor))
TwoThreadPoolGroup = new DefaultPGroup(new DefaultPool(grailsApplication.mainContext.TwoThreadPool.threadPoolExecutor))
}
String asyncFlowWithPromise(String arg1){
List results = new ArrayList();
//http://www.jsontest.com/
Promise p1 = task { "http://echo.jsontest.com/YouSaid/${arg1}".toURL().text }
Promise p2 = task { "http://md5.jsontest.com/?text=${arg1}".toURL().text }
Promise p3 = task { "http://ip.jsontest.com/".toURL().text }
Promise p4 = task { "http://time.jsontest.com/".toURL().text }
Promise p5 = task { "http://validate.jsontest.com/?json=${arg1}".toURL().text }
onComplete([p3,p4]) { List p3p4 ->
results << p3p4
}.then { p3p4 ->
println p3p4
p3p4 << waitAll(p1, p2, p5)
}.get(3,TimeUnit.SECONDS)
}
String asyncFlowWithPromisesUsingControlledThreadPools(String arg1) {
Promise p1 = new GparsPromise(SumoThreadPoolGroup.task {
log.debug('this log should be in sumo pool')
"http://echo.jsontest.com/YouSaid/${arg1}".toURL().text
})
Promise p2 = new GparsPromise(TwoThreadPoolGroup.task {
log.debug('this log should be in two pool')
"http://md5.jsontest.com/?text=${arg1}".toURL().text
})
waitAll(p1, p2)
}
private Future<String> await(String arg1, int count) {
log.debug "beginning await work for ${count} seconds..."
int loop = 0
try{
//do batch jobs incrementally
while(!Thread.currentThread().isInterrupted() & loop < count){ //TODO: checking isInterrupted is really useful?
//do a sub-task of a long running batch job here
TimeUnit.SECONDS.sleep(1) // fake working for one second
loop++
log.debug "loop count ... ${loop}"
}
}catch(InterruptedException ie){
log.debug "got Interrupted!... loop count ... ${loop}, Cleaning the resource..."
//do cleanup job... close connections etc...
Thread.currentThread().interrupt() // TADA: this will keep not swallowing interrupt flag
}
log.debug "loop count Out ... ${loop}"
return new AsyncResult<String>("REPLY: ${arg1} looped: ${loop} times")
}
@Async('SumoThreadPool')
Future<String> asyncWithSumoThreadPool(String arg1) {
return await(arg1, 6);
}
@Retry(attempts = 2, delay =8L, unit = TimeUnit.SECONDS, exceptions = [TaskRejectedException.class])
// TwoThreadPool Executor uses default rejection-policy i.e., AbortPolicy
// Throws TaskRejectedException if more then 3 concurrent request hit.
@Async("TwoThreadPool")
Future<String> retryOnFailureWithTwoThreadPool(String arg1) throws TaskRejectedException{
return await(arg1 , 8);
}
@Timeout(value = 3l, unit = TimeUnit.SECONDS)
String timeout(String arg1) throws InterruptedException{
def time = new Random().nextInt(3)+2 // time will be either 2 , 3 or 4
log.debug "waiting in testTimeout1 ${time}..."
TimeUnit.SECONDS.sleep(time)
return arg1
}
@Timeout2(value = 3010l, unit = TimeUnit.MILLISECONDS)
String timeout2(String arg1) throws TimeoutException{
def time = new Random().nextInt(3)+2 // time will be either 2 , 3 or 4
log.debug "waiting in testTimeout2 ${time}..."
TimeUnit.SECONDS.sleep(time)
return arg1
}
//implicit type = ThrottleType.CONCURRENCY
@Governor(limit = 1)
String throttleWithConcurrency(String arg1) {
TimeUnit.SECONDS.sleep(5)
return arg1
}
@Governor(limit = 1, blocking=true)
String throttleWithConcurrencyAndBlocking(String arg1) {
TimeUnit.SECONDS.sleep(5)
return arg1
}
@Governor(type = GovernorType.RATE, limit = 2, period = 45L, unit = TimeUnit.SECONDS)
String throttleWithRateLimit(String arg1) {
TimeUnit.SECONDS.sleep(5)
return arg1
}
@Governor(type = GovernorType.RATE, limit = 3, period = 60L, blocking=true, unit = TimeUnit.SECONDS)
String throttleWithRateLimitAndBlocking(String arg1) {
TimeUnit.SECONDS.sleep(5)
return arg1
}
// Apply @Retry only to Idempotent Operations
@Retry(attempts = 2, delay =8L, unit = TimeUnit.SECONDS, exceptions = [IOException.class,RateLimitExceededException.class])
String retryOnFailure(String arg1) {
if (new Random().nextInt(5) == 4) throw new IOException()
// need to call this way to go through CGLIB Proxy.
return grailsApplication.mainContext.demoService.throttleWithRateLimit(arg1)
}
@CircuitBreaker(failureThreshold=2,failureThresholdTimeFrameMs=60000l,retryAfterMs=80000l)
String circuitBreaker(String arg1) {
TimeUnit.SECONDS.sleep(2)
if (new Random().nextInt(3) == 2) throw new FileNotFoundException("fake FileNotFoundException")
return "Hello ${arg1} in testCircuitBreaker"
}
@Fallback(value = ['barService','jarService'], exceptions = [OpenCircuitException.class,FileNotFoundException.class])
@CircuitBreaker(failureThreshold=2,failureThresholdTimeFrameMs=60000l,retryAfterMs=80000l,failureIndications=[FileNotFoundException.class])
String greet() {
TimeUnit.SECONDS.sleep(2)
throw new FileNotFoundException()
return "greet: Hello! in DemoService.greet()"
}
}
@Transactional(readOnly = true)
@Fallback(['barService','jarService'])
class FooService implements Greet{
static scope = "prototype"
@Override
public String greet() throws IOException{
if (new Random().nextInt(5) == 4) throw new IOException("dummy exception from FooService in...greet");
return "Hello from Foo in greet ";
}
@Override
public String salute() throws FileNotFoundException{
if (new Random().nextInt(5) == 4) throw new FileNotFoundException("dummy exception from FooService in...salute");
return "Hello from Foo in salute ";
}
}
public interface Greet {
public String greet() throws IOException;
public String salute() throws FileNotFoundException;
}
import grails.transaction.Transactional
@Transactional(readOnly = true)
class JarService implements Greet{
static scope = "prototype"
@Override
public String greet() throws IOException{
if (new Random().nextInt(2) == 1) throw new NullPointerException("dummy exception from JarService in...greet");
return "Hello from Jar in greet ";
}
@Override
public String salute() throws FileNotFoundException{
if (new Random().nextInt(2) == 1) throw new NullPointerException("dummy exception from JarService in...salute");
return "Hello from Jar in salute ";
}
}
// Place your Spring DSL code here
beans = {
//importBeans 'file:grails-app/conf/spring/resiliency.groovy'
xmlns task:"http://www.springframework.org/schema/task"
// Executor Pools
task.executor(id:'defaultExecutor', 'pool-size':'5-10', 'queue-capacity':'25', 'rejection-policy':'CALLER_RUNS')
task.executor(id:'SumoThreadPool', 'pool-size':'5-10', 'queue-capacity':'25', 'rejection-policy':'CALLER_RUNS')
task.executor(id:'TwoThreadPool', 'pool-size':'1-2', 'queue-capacity':'1')
tarService(com.crossbusiness.resiliency.demo.TarService)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment