Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Exposing some of Redis's streaming operations via Observable
package com.parthpatil
import com.lambdaworks.redis.output.{KeyStreamingChannel, KeyValueStreamingChannel}
import com.lambdaworks.redis.pubsub.{RedisPubSubConnection, RedisPubSubAdapter, RedisPubSubListener}
import com.lambdaworks.redis.{MapScanCursor, RedisFuture, RedisClient}
import rx.lang.scala.Observable
import scala.concurrent.{Promise, Future, Await, ExecutionContext}
import scala.concurrent.duration._
//import scala.concurrent.ExecutionContext.Implicits.global
import com.google.common.util.concurrent._
import java.util.concurrent.{ExecutorService, Executors}
import scala.util.{Failure, Success}
object RxRedis extends App {
implicit def guavaFutureToScalaFuture[T](gFuture: ListenableFuture[T])
(implicit executor: ListeningExecutorService): Future[T] = {
val p = Promise[T]()
Futures.addCallback[T](gFuture, new FutureCallback[T] {
def onSuccess(s: T) { p.success(s) }
def onFailure(e: Throwable) { p.failure(e) }
}, executor)
p.future
}
val executorService: ExecutorService = Executors.newFixedThreadPool(4)
implicit val executionContext = ExecutionContext.fromExecutorService(executorService)
implicit val executor = MoreExecutors.listeningDecorator(executorService)
val client = new RedisClient("127.0.0.1")
val asyncConnection = client.connectAsync()
def testSimpleGetSet(): Unit = {
asyncConnection.get("k1").onComplete {
case Success(r) => println(s"k1 -> $r")
case Failure(e) => println(e)
}
}
def testPubSub(): Unit = {
val testChannel = "chan1"
Observable.interval(1 second) subscribe { x =>
asyncConnection.publish(testChannel, System.currentTimeMillis.toString) onSuccess { case r =>
println(s"Num clients received = $r")
}
}
val obs = Observable[(String, String)] { subscriber =>
val connection: RedisPubSubConnection[String, String] = client.connectPubSub()
connection.addListener(new RedisPubSubAdapter[String, String]() {
override def message(chan: String, msg: String): Unit = {
if (!subscriber.isUnsubscribed)
subscriber.onNext((chan, msg))
}
})
connection.subscribe(testChannel)
}
obs subscribe { x => println(x) }
}
def testHscan(): Unit = {
val hashKey = "hash1"
Observable[(String, String)] { subscriber =>
asyncConnection.hscan(new KeyValueStreamingChannel[String, String]() {
override def onKeyValue(key: String, value: String) {
if (!subscriber.isUnsubscribed)
subscriber.onNext((key, value))
}
}, hashKey)
} subscribe { kv =>
println(s"key => ${kv._1}, value = ${kv._2}")
}
}
def testScanKeys(): Unit = {
Observable[String] { subscriber =>
asyncConnection.scan(new KeyStreamingChannel[String]() {
override def onKey(key: String) {
if (!subscriber.isUnsubscribed)
subscriber.onNext(key)
}
})
} subscribe { key => println(s"key => $key") }
}
//testPubSub()
//testHscan()
testScanKeys()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment