Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
Spring Reactive SSE Service Example (
import org.springframework.stereotype.Service
import reactor.core.publisher.Flux
import redis.clients.jedis.JedisPubSub
import reactor.core.publisher.FluxSink
import redis.clients.jedis.JedisPoolConfig
import redis.clients.jedis.JedisPool
internal class MyListener(fluxSink: FluxSink<String>) : JedisPubSub() {
val fluxSink = fluxSink
override fun onMessage(channel: String?, message: String?) {!!)
override fun onSubscribe(channel: String?, subscribedChannels: Int) {}
override fun onUnsubscribe(channel: String?, subscribedChannels: Int) {}
override fun onPSubscribe(pattern: String?, subscribedChannels: Int) {}
override fun onPUnsubscribe(pattern: String?, subscribedChannels: Int) {}
override fun onPMessage(pattern: String?, channel: String?, message: String?) {}
class ServerSideEvents {
companion object {
val pool = JedisPool(JedisPoolConfig(), "localhost")
val jedis = pool.resource
val flux : Flux<String> = Flux.create { fluxSink ->
println("New flux...")
val l = MyListener(fluxSink)
jedis.subscribe(l, "foo")
println("After subscribe...")
while (true) {
if (fluxSink.isCancelled){
println("Closing Jedis Client...")
val cloneFlux = flux.share()
fun getSSE(): Flux<String> {
return cloneFlux
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment