Skip to content

Instantly share code, notes, and snippets.

@pintowar
Created January 14, 2015 00:56
Show Gist options
  • Save pintowar/7c4dedd22ceb64f45096 to your computer and use it in GitHub Desktop.
Save pintowar/7c4dedd22ceb64f45096 to your computer and use it in GitHub Desktop.
Creating a SSE streaming with Apache Camel and Ratpack, using Rx Groovy as a glue
@Grab('com.netflix.rxjava:rxjava-groovy:0.20.7')
@Grab('io.reactivex:rxjava-reactive-streams:0.3.0')
@Grab('org.apache.camel:camel-rx:2.14.1')
@Grab('io.ratpack:ratpack-groovy:0.9.11')
@Grab('org.slf4j:slf4j-simple:1.6.6')
import org.apache.camel.impl.*
import org.apache.camel.rx.*
import static rx.RxReactiveStreams.toPublisher
import static ratpack.groovy.Groovy.ratpack
import static ratpack.sse.ServerSentEvents.serverSentEvents;
def camelContext = new DefaultCamelContext()
def rx = new ReactiveCamel(camelContext);
def observable = rx.toObservable("timer:foo?period=2s").map{ Math.random() > 0.5 ? "alpha" : "omega" }
def toStream(obs, context, messageFilter) {
toPublisher(obs.takeWhile{ context.directChannelAccess.channel.isOpen() }
.filter{ it == messageFilter })
}
camelContext.start()
addShutdownHook{ camelContext.stop() }
/**
* After run the example, try the following pages:
* http://localhost:5050/random/alpha
* http://localhost:5050/random/omega
*/
ratpack {
handlers {
get('random/:type') {
def stream = toStream(observable, context, pathTokens.get('type'))
def events = serverSentEvents(stream){ e ->
println e.item
e.event("random").data(e.item)
}
onClose { println "Connection End" }
render(events)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment