Created
May 25, 2015 00:51
-
-
Save parth-patil/8d105d82b745a6f69cb1 to your computer and use it in GitHub Desktop.
Exposing some of Redis's streaming operations via Observable
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.parthpatil | |
import com.lambdaworks.redis.output.{KeyStreamingChannel, KeyValueStreamingChannel} | |
import com.lambdaworks.redis.pubsub.{RedisPubSubConnection, RedisPubSubAdapter, RedisPubSubListener} | |
import com.lambdaworks.redis.{MapScanCursor, RedisFuture, RedisClient} | |
import rx.lang.scala.Observable | |
import scala.concurrent.{Promise, Future, Await, ExecutionContext} | |
import scala.concurrent.duration._ | |
//import scala.concurrent.ExecutionContext.Implicits.global | |
import com.google.common.util.concurrent._ | |
import java.util.concurrent.{ExecutorService, Executors} | |
import scala.util.{Failure, Success} | |
object RxRedis extends App { | |
implicit def guavaFutureToScalaFuture[T](gFuture: ListenableFuture[T]) | |
(implicit executor: ListeningExecutorService): Future[T] = { | |
val p = Promise[T]() | |
Futures.addCallback[T](gFuture, new FutureCallback[T] { | |
def onSuccess(s: T) { p.success(s) } | |
def onFailure(e: Throwable) { p.failure(e) } | |
}, executor) | |
p.future | |
} | |
val executorService: ExecutorService = Executors.newFixedThreadPool(4) | |
implicit val executionContext = ExecutionContext.fromExecutorService(executorService) | |
implicit val executor = MoreExecutors.listeningDecorator(executorService) | |
val client = new RedisClient("127.0.0.1") | |
val asyncConnection = client.connectAsync() | |
def testSimpleGetSet(): Unit = { | |
asyncConnection.get("k1").onComplete { | |
case Success(r) => println(s"k1 -> $r") | |
case Failure(e) => println(e) | |
} | |
} | |
def testPubSub(): Unit = { | |
val testChannel = "chan1" | |
Observable.interval(1 second) subscribe { x => | |
asyncConnection.publish(testChannel, System.currentTimeMillis.toString) onSuccess { case r => | |
println(s"Num clients received = $r") | |
} | |
} | |
val obs = Observable[(String, String)] { subscriber => | |
val connection: RedisPubSubConnection[String, String] = client.connectPubSub() | |
connection.addListener(new RedisPubSubAdapter[String, String]() { | |
override def message(chan: String, msg: String): Unit = { | |
if (!subscriber.isUnsubscribed) | |
subscriber.onNext((chan, msg)) | |
} | |
}) | |
connection.subscribe(testChannel) | |
} | |
obs subscribe { x => println(x) } | |
} | |
def testHscan(): Unit = { | |
val hashKey = "hash1" | |
Observable[(String, String)] { subscriber => | |
asyncConnection.hscan(new KeyValueStreamingChannel[String, String]() { | |
override def onKeyValue(key: String, value: String) { | |
if (!subscriber.isUnsubscribed) | |
subscriber.onNext((key, value)) | |
} | |
}, hashKey) | |
} subscribe { kv => | |
println(s"key => ${kv._1}, value = ${kv._2}") | |
} | |
} | |
def testScanKeys(): Unit = { | |
Observable[String] { subscriber => | |
asyncConnection.scan(new KeyStreamingChannel[String]() { | |
override def onKey(key: String) { | |
if (!subscriber.isUnsubscribed) | |
subscriber.onNext(key) | |
} | |
}) | |
} subscribe { key => println(s"key => $key") } | |
} | |
//testPubSub() | |
//testHscan() | |
testScanKeys() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment