Skip to content

Instantly share code, notes, and snippets.

@dvas0004
Created July 9, 2018 10:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dvas0004/8ec959e9724cc59deaa6c9feeaa9eb25 to your computer and use it in GitHub Desktop.
Save dvas0004/8ec959e9724cc59deaa6c9feeaa9eb25 to your computer and use it in GitHub Desktop.
Spring Reactive SSE Service Example (https://blog.davidvassallo.me/3170)
package io.cybersift.PcapParserWeb.services
import org.springframework.stereotype.Service
import reactor.core.publisher.Flux
import redis.clients.jedis.JedisPubSub
import reactor.core.publisher.FluxSink
import redis.clients.jedis.JedisPoolConfig
import redis.clients.jedis.JedisPool
internal class MyListener(fluxSink: FluxSink<String>) : JedisPubSub() {
val fluxSink = fluxSink
override fun onMessage(channel: String?, message: String?) {
this.fluxSink.next(message!!)
}
override fun onSubscribe(channel: String?, subscribedChannels: Int) {}
override fun onUnsubscribe(channel: String?, subscribedChannels: Int) {}
override fun onPSubscribe(pattern: String?, subscribedChannels: Int) {}
override fun onPUnsubscribe(pattern: String?, subscribedChannels: Int) {}
override fun onPMessage(pattern: String?, channel: String?, message: String?) {}
}
@Service
class ServerSideEvents {
companion object {
val pool = JedisPool(JedisPoolConfig(), "localhost")
val jedis = pool.resource
val flux : Flux<String> = Flux.create { fluxSink ->
println("New flux...")
val l = MyListener(fluxSink)
jedis.subscribe(l, "foo")
println("After subscribe...")
while (true) {
if (fluxSink.isCancelled){
jedis.close()
println("Closing Jedis Client...")
break
}
}
}
val cloneFlux = flux.share()
}
fun getSSE(): Flux<String> {
return cloneFlux
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment