Exposing some of Redis's streaming operations via Observable
package com.parthpatil | |
import com.lambdaworks.redis.output.{KeyStreamingChannel, KeyValueStreamingChannel} | |
import com.lambdaworks.redis.pubsub.{RedisPubSubConnection, RedisPubSubAdapter, RedisPubSubListener} | |
import com.lambdaworks.redis.{MapScanCursor, RedisFuture, RedisClient} | |
import rx.lang.scala.Observable | |
import scala.concurrent.{Promise, Future, Await, ExecutionContext} | |
import scala.concurrent.duration._ | |
//import scala.concurrent.ExecutionContext.Implicits.global | |
import com.google.common.util.concurrent._ | |
import java.util.concurrent.{ExecutorService, Executors} | |
import scala.util.{Failure, Success} | |
object RxRedis extends App { | |
implicit def guavaFutureToScalaFuture[T](gFuture: ListenableFuture[T]) | |
(implicit executor: ListeningExecutorService): Future[T] = { | |
val p = Promise[T]() | |
Futures.addCallback[T](gFuture, new FutureCallback[T] { | |
def onSuccess(s: T) { p.success(s) } | |
def onFailure(e: Throwable) { p.failure(e) } | |
}, executor) | |
p.future | |
} | |
val executorService: ExecutorService = Executors.newFixedThreadPool(4) | |
implicit val executionContext = ExecutionContext.fromExecutorService(executorService) | |
implicit val executor = MoreExecutors.listeningDecorator(executorService) | |
val client = new RedisClient("127.0.0.1") | |
val asyncConnection = client.connectAsync() | |
def testSimpleGetSet(): Unit = { | |
asyncConnection.get("k1").onComplete { | |
case Success(r) => println(s"k1 -> $r") | |
case Failure(e) => println(e) | |
} | |
} | |
def testPubSub(): Unit = { | |
val testChannel = "chan1" | |
Observable.interval(1 second) subscribe { x => | |
asyncConnection.publish(testChannel, System.currentTimeMillis.toString) onSuccess { case r => | |
println(s"Num clients received = $r") | |
} | |
} | |
val obs = Observable[(String, String)] { subscriber => | |
val connection: RedisPubSubConnection[String, String] = client.connectPubSub() | |
connection.addListener(new RedisPubSubAdapter[String, String]() { | |
override def message(chan: String, msg: String): Unit = { | |
if (!subscriber.isUnsubscribed) | |
subscriber.onNext((chan, msg)) | |
} | |
}) | |
connection.subscribe(testChannel) | |
} | |
obs subscribe { x => println(x) } | |
} | |
def testHscan(): Unit = { | |
val hashKey = "hash1" | |
Observable[(String, String)] { subscriber => | |
asyncConnection.hscan(new KeyValueStreamingChannel[String, String]() { | |
override def onKeyValue(key: String, value: String) { | |
if (!subscriber.isUnsubscribed) | |
subscriber.onNext((key, value)) | |
} | |
}, hashKey) | |
} subscribe { kv => | |
println(s"key => ${kv._1}, value = ${kv._2}") | |
} | |
} | |
def testScanKeys(): Unit = { | |
Observable[String] { subscriber => | |
asyncConnection.scan(new KeyStreamingChannel[String]() { | |
override def onKey(key: String) { | |
if (!subscriber.isUnsubscribed) | |
subscriber.onNext(key) | |
} | |
}) | |
} subscribe { key => println(s"key => $key") } | |
} | |
//testPubSub() | |
//testHscan() | |
testScanKeys() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment