Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
#! /usr/bin/env groovy
import ratpack.exec.Blocking
import ratpack.exec.ExecInterceptor
import ratpack.exec.Execution
import ratpack.func.Block
import ratpack.groovy.test.embed.GroovyEmbeddedApp
import ratpack.handling.Context
import ratpack.http.client.HttpClient
import ratpack.rx.RxRatpack
import ratpack.service.Service
import ratpack.service.StartEvent
import ratpack.test.embed.EmbeddedApp
import rx.Observable
import rx.plugins.RxJavaPlugins
import static ratpack.groovy.Groovy.ratpack
@GrabResolver(name = 'jcenter', root = 'http://jcenter.bintray.com/')
@GrabExclude('org.codehaus.groovy:groovy-all')
@Grab('io.ratpack:ratpack-groovy:1.3.3')
@Grab('io.ratpack:ratpack-rx:1.3.3')
@Grab('io.ratpack:ratpack-groovy-test:1.3.3')
@Grab('org.slf4j:slf4j-simple:1.7.12')
// stub application that sleeps for the number of seconds in the path you give it
// ex: http://localhost:<port>/2 sleeps for 2 seconds before returning the value "2"
EmbeddedApp stubApp = GroovyEmbeddedApp.of {
handlers {
get(":sleepFor") {
Integer sleepFor = context.pathTokens['sleepFor'].toInteger() ?: 1
Blocking.exec { ->
println "Stub Sleep App GET Request, sleep for: $sleepFor seconds"
sleep(sleepFor * 1000)
context.render sleepFor.toString()
}
}
}
}
public class ExecutionTimer {
private ExecutionTimer() {}
final UUID id = UUID.randomUUID()
final Long executionStart = System.currentTimeMillis()
Long segmentStart
ExecutionTimer startSegment() {
segmentStart = System.currentTimeMillis()
this
}
Long getExecutionTime() { System.currentTimeMillis() - executionStart }
Long getSegmentTime() { System.currentTimeMillis() - segmentStart }
@Override
String toString() {
"$id - segment time: ${segmentTime} execution time: ${executionTime}ms"
}
public static ExecutionTimer startExecutionSegment(Execution execution) {
ExecutionTimer timer = execution.maybeGet(ExecutionTimer).orElse(null)
if (!timer) {
timer = new ExecutionTimer()
execution.add(ExecutionTimer, timer)
}
timer.startSegment()
}
}
public class LoggingExecInterceptor implements ExecInterceptor {
@Override
void intercept(Execution execution, ExecInterceptor.ExecType execType, Block executionSegment) throws Exception {
ExecutionTimer timer = ExecutionTimer.startExecutionSegment(execution)
try {
executionSegment.execute()
} finally {
println "${Thread.currentThread().name} - $timer - ${execType}"
}
}
}
ratpack {
serverConfig {
development false
}
bindings {
bindInstance(new LoggingExecInterceptor())
bindInstance Service, new Service() {
@Override
void onStart(StartEvent event) throws Exception {
if (!RxJavaPlugins.getInstance().getObservableExecutionHook()) RxRatpack.initialize()
}
}
}
handlers {
all { Context context ->
HttpClient httpClient = context.get(HttpClient)
final String executionId = context.get(ExecutionTimer).id.toString()
// create a List of URIs to the stub app above that will ask it to sleep
// for N seconds before returning the number of seconds it was asked to sleep
final List<URI> REQUEST_SLEEP_URIS = [3, 2, 1].collect {
URI.create("http://${stubApp.address.host}:${stubApp.address.port}/${it}")
}
println "${Thread.currentThread().name} - $executionId - A. Original compute thread"
// Iterate over all uris, make async http request for each and collect the results to render out
Observable.from(REQUEST_SLEEP_URIS) // stream of URIs
.flatMap { uri ->
println "${Thread.currentThread().name} - $executionId - B. GET: $uri"
RxRatpack.observe(httpClient.get(uri)) // async http request to "sleep" service
}
.map { it.body.text } // get the body text for each http result
.toList() // collect into a single list and then subscribe to it
.subscribe({ List<String> responses ->
println "${Thread.currentThread().name} - $executionId - C. Subscribe final result"
context.render responses.join(", ")
})
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment