Skip to content

Instantly share code, notes, and snippets.

@cab
Last active October 10, 2019 12:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cab/3b628f54b4a017d9460b71961ba4bd9a to your computer and use it in GitHub Desktop.
Save cab/3b628f54b4a017d9460b71961ba4bd9a to your computer and use it in GitHub Desktop.
class WebsocketSubject[I: Encoder, O: Decoder](url: String, os: OverflowStrategy.Synchronous[O])(
implicit scheduler: Scheduler
) extends Subject[I, O] {
self =>
import NettyFutureConverters._
private val socket = {
val httpClient = org.asynchttpclient.Dsl.asyncHttpClient()
Task
.deferFuture(
httpClient
.prepareGet(url)
.execute(new ws.WebSocketUpgradeHandler.Builder().build())
.toCompletableFuture()
.toScala
)
.memoize
}
private val connection = Observable
.fromTask(socket)
.switchMap(
socket =>
Observable.create(os) { downstream =>
socket.addWebSocketListener(new ws.WebSocketListener {
override def onOpen(socket: ws.WebSocket) =
println("connected")
override def onClose(socket: ws.WebSocket, code: Int, reason: String) =
println("closed")
override def onTextFrame(payload: String, finalFragment: Boolean, rsv: Int) = {
println(payload)
val decoded = decode[O](payload)
println(decoded)
}
override def onError(err: Throwable) =
println(err)
})
Cancelable(() => {
socket.sendCloseFrame().get() // TODO(cab) this blocks!
()
})
}
)
def unsafeSubscribeFn(subscriber: Subscriber[O]): monix.execution.Cancelable =
connection.unsafeSubscribeFn(new Subscriber[O] {
val scheduler = subscriber.scheduler
def onNext(elem: O): Future[Ack] = subscriber.onNext(elem)
def onError(ex: Throwable): Unit = {
scheduler.reportFailure(ex)
onComplete()
}
def onComplete(): Unit =
{
val _ = self
.delayExecution(3.seconds)
.unsafeSubscribeFn(subscriber)
}
})
def onComplete(): Unit = ???
def onError(ex: Throwable): Unit = ???
def onNext(elem: I): Future[Ack] =
socket
.flatMap(s => Task.deferFuture { s.sendTextFrame(implicitly[Encoder[I]].apply(elem).toString()).toScala })
.map(_ => Ack.Continue)
.runToFuture
def size: Int = ???
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment