Created
July 9, 2018 10:42
-
-
Save dvas0004/8ec959e9724cc59deaa6c9feeaa9eb25 to your computer and use it in GitHub Desktop.
Spring Reactive SSE Service Example (https://blog.davidvassallo.me/3170)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package io.cybersift.PcapParserWeb.services | |
import org.springframework.stereotype.Service | |
import reactor.core.publisher.Flux | |
import redis.clients.jedis.JedisPubSub | |
import reactor.core.publisher.FluxSink | |
import redis.clients.jedis.JedisPoolConfig | |
import redis.clients.jedis.JedisPool | |
internal class MyListener(fluxSink: FluxSink<String>) : JedisPubSub() { | |
val fluxSink = fluxSink | |
override fun onMessage(channel: String?, message: String?) { | |
this.fluxSink.next(message!!) | |
} | |
override fun onSubscribe(channel: String?, subscribedChannels: Int) {} | |
override fun onUnsubscribe(channel: String?, subscribedChannels: Int) {} | |
override fun onPSubscribe(pattern: String?, subscribedChannels: Int) {} | |
override fun onPUnsubscribe(pattern: String?, subscribedChannels: Int) {} | |
override fun onPMessage(pattern: String?, channel: String?, message: String?) {} | |
} | |
@Service | |
class ServerSideEvents { | |
companion object { | |
val pool = JedisPool(JedisPoolConfig(), "localhost") | |
val jedis = pool.resource | |
val flux : Flux<String> = Flux.create { fluxSink -> | |
println("New flux...") | |
val l = MyListener(fluxSink) | |
jedis.subscribe(l, "foo") | |
println("After subscribe...") | |
while (true) { | |
if (fluxSink.isCancelled){ | |
jedis.close() | |
println("Closing Jedis Client...") | |
break | |
} | |
} | |
} | |
val cloneFlux = flux.share() | |
} | |
fun getSSE(): Flux<String> { | |
return cloneFlux | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment