Skip to content

Instantly share code, notes, and snippets.

@davidhoyt
Created August 29, 2015 00:17
Show Gist options
  • Save davidhoyt/deccccf37ac293130622 to your computer and use it in GitHub Desktop.
Save davidhoyt/deccccf37ac293130622 to your computer and use it in GitHub Desktop.
Slick 3 DatabasePublisher to Rx Observable
import rx.lang.scala.{Subject, Observable}
import slick.backend.DatabasePublisher
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}
trait StreamExtensions {
implicit def extendDatabasePublisher[T](publisher: DatabasePublisher[T]): StreamExtensions.DatabasePublisherExtensions[T] =
StreamExtensions.DatabasePublisherExtensions[T](publisher)
}
object StreamExtensions {
implicit class DatabasePublisherExtensions[T](val publisher: DatabasePublisher[T]) extends AnyVal {
def toObservable(implicit ec: ExecutionContext): Observable[T] = {
val subject = Subject[T]()
val future = publisher foreach subject.onNext
future onComplete {
case Success(_) => subject.onCompleted()
case Failure(error) => subject.onError(error)
}
subject
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment