Last active
August 29, 2015 14:21
-
-
Save mp911de/8664c7c7cae3cb4c0092 to your computer and use it in GitHub Desktop.
Using RxJava with lettuce (timeout and event executor should be adopted to suit your needs). Thx to @vleushin
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Arrays; | |
import java.util.Collection; | |
import java.util.List; | |
import java.util.Vector; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.Executor; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.function.Supplier; | |
import com.google.common.collect.Lists; | |
import rx.Observable; | |
import rx.Subscriber; | |
import com.lambdaworks.redis.RedisCommandTimeoutException; | |
import com.lambdaworks.redis.RedisException; | |
import com.lambdaworks.redis.RedisFuture; | |
/** | |
* Subscribes on multiple futures. | |
* Success flow: Subscriber will be notified upon sucess if all futures come back sucessfully before the timeout is exceeded. | |
* Error flow: Subscriber will be notified for every error or if the timeout is exceeded. | |
*/ | |
class RedisMultiFutureSubscription<T> implements Observable.OnSubscribe<List<T>> { | |
private final Executor onFutureCompletedExecutor; | |
private final Supplier<? extends Collection<RedisFuture<T>>> supplier; | |
public RedisMultiFutureSubscription(Executor onFutureCompletedExecutor, | |
Supplier<? extends Collection<RedisFuture<T>>> supplier) { | |
this.onFutureCompletedExecutor = onFutureCompletedExecutor; | |
this.supplier = supplier; | |
} | |
public void call(Subscriber<? super List<T>> subscriber) { | |
Collection<RedisFuture<T>> futures = supplier.get(); | |
AtomicInteger counter = new AtomicInteger(futures.size()); | |
AtomicBoolean hadErrors = new AtomicBoolean(false); | |
Object[] responses = new Object[futures.size()]; | |
int index = 0; | |
for (RedisFuture<T> future : futures) { | |
final int position = index++; | |
future.addListener(() -> { | |
if (future.getError() != null) { | |
hadErrors.set(true); | |
} | |
if (subscriber.isUnsubscribed()) { | |
return; | |
} | |
if (future.getError() != null) { | |
subscriber.onError(new RedisException(future.getError())); | |
} else { | |
try { | |
responses[position] = future.get(); | |
} catch (ExecutionException e) { | |
hadErrors.set(true); | |
subscriber.onError(e.getCause()); | |
} catch (Exception e) { | |
hadErrors.set(true); | |
subscriber.onError(e); | |
} | |
if (counter.decrementAndGet() == 0 && !hadErrors.get()) { | |
subscriber.onNext((List<T>) Lists.newArrayList(responses)); | |
subscriber.onCompleted(); | |
} | |
} | |
}, onFutureCompletedExecutor); | |
} | |
if (!await(futures, 5, TimeUnit.SECONDS)) { | |
hadErrors.set(true); | |
subscriber.onError(new RedisCommandTimeoutException()); | |
} | |
} | |
private boolean await(Collection<RedisFuture<T>> futures, int timeout, TimeUnit timeunit) { | |
long nanos = timeunit.toNanos(timeout); | |
long time = System.nanoTime(); | |
for (RedisFuture<T> future : futures) { | |
if (nanos < 0) { | |
return false; | |
} | |
if (!future.await(nanos, TimeUnit.NANOSECONDS)) { | |
return false; | |
} | |
long now = System.nanoTime(); | |
nanos -= now - time; | |
time = now; | |
} | |
return true; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Collection; | |
import java.util.List; | |
import java.util.concurrent.Executor; | |
import java.util.concurrent.Executors; | |
import java.util.function.Supplier; | |
import com.lambdaworks.redis.RedisFuture; | |
import rx.Observable; | |
/** | |
* @author <a href="mailto:mpaluch@paluch.biz">Mark Paluch</a> | |
* @soundtrack Ian Lurgee Presents - AutoSuggestion (May 2015) | |
*/ | |
public class RedisObservables { | |
private static final Executor executor = Executors.newSingleThreadExecutor(); | |
/** | |
* Subscribe to one future. | |
*/ | |
public static <T> Observable<T> single(Supplier<RedisFuture<? extends T>> supplier) { | |
return Observable.create(new RedisSingleFutureSubscription<>(executor, supplier)); | |
} | |
/** | |
* Subscribe to a collection of futures. | |
*/ | |
public static <T> Observable<List<T>> multiple(Supplier<Collection<RedisFuture<T>>> supplier) { | |
return Observable.create(new RedisMultiFutureSubscription<>(executor, supplier)); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.Executor; | |
import java.util.concurrent.TimeUnit; | |
import java.util.function.Supplier; | |
import rx.Observable; | |
import rx.Subscriber; | |
import com.lambdaworks.redis.RedisCommandTimeoutException; | |
import com.lambdaworks.redis.RedisException; | |
import com.lambdaworks.redis.RedisFuture; | |
/** | |
* Subscribes on one particular future. | |
* Success flow: Subscriber will be notified upon sucess if the future comes back before the timeout is exceeded. | |
* Error flow: Subscriber will be notified on error or if the timeout exceeded. | |
*/ | |
class RedisSingleFutureSubscription<T> implements Observable.OnSubscribe<T> { | |
private final Executor onFutureCompletedExecutor; | |
private final Supplier<RedisFuture<? extends T>> supplier; | |
public RedisSingleFutureSubscription(Executor onFutureCompletedExecutor, Supplier<RedisFuture<? extends T>> supplier) { | |
this.onFutureCompletedExecutor = onFutureCompletedExecutor; | |
this.supplier = supplier; | |
} | |
public void call(Subscriber<? super T> subscriber) { | |
RedisFuture<? extends T> future = supplier.get(); | |
future.addListener(() -> { | |
if (subscriber.isUnsubscribed()) { | |
return; | |
} | |
if (future.getError() != null) { | |
subscriber.onError(new RedisException(future.getError())); | |
} else { | |
try { | |
subscriber.onNext(future.get()); | |
subscriber.onCompleted(); | |
} catch (ExecutionException e) { | |
subscriber.onError(e.getCause()); | |
} catch (Exception e) { | |
subscriber.onError(e); | |
} | |
} | |
}, onFutureCompletedExecutor); | |
if (!future.await(5, TimeUnit.SECONDS)) { | |
subscriber.onError(new RedisCommandTimeoutException()); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import static java.util.stream.Collectors.toList; | |
import static org.junit.Assert.assertEquals; | |
import java.util.Arrays; | |
import java.util.List; | |
import java.util.concurrent.TimeUnit; | |
import org.junit.After; | |
import org.junit.AfterClass; | |
import org.junit.Before; | |
import org.junit.BeforeClass; | |
import org.junit.Rule; | |
import org.junit.Test; | |
import org.junit.rules.ExpectedException; | |
import rx.Observable; | |
import com.lambdaworks.redis.RedisAsyncConnection; | |
import com.lambdaworks.redis.RedisClient; | |
import com.lambdaworks.redis.RedisException; | |
/** | |
* @author <a href="mailto:mpaluch@paluch.biz">Mark Paluch</a> | |
* @soundtrack Work Out! 048 - with guests Cosmic Gate, hosts Store N Forward | |
*/ | |
public class SubscriptionTest { | |
@Rule | |
public ExpectedException expectedException = ExpectedException.none(); | |
private static RedisClient client; | |
private RedisAsyncConnection<String, String> connection; | |
@BeforeClass | |
public static void setupClient() { | |
client = new RedisClient("127.0.0.1"); | |
} | |
@AfterClass | |
public static void shutdownClient() { | |
client.shutdown(0, 0, TimeUnit.MILLISECONDS); | |
} | |
@Before | |
public void before() throws Exception { | |
connection = client.connectAsync(); | |
connection.del("key", "key1", "key2").get(); | |
} | |
@After | |
public void after() throws Exception { | |
connection.close(); | |
connection = null; | |
} | |
@Test | |
public void singleFuture() throws Exception { | |
Observable<String> single = RedisObservables.single(() -> connection.set("key", "value")); | |
String result = single.toBlocking().first(); | |
assertEquals("OK", result); | |
} | |
@Test | |
public void singleFutureWithError() throws Exception { | |
Observable<String> single = RedisObservables.single(() -> connection.set("key", "value")); | |
String result = single.toBlocking().first(); | |
assertEquals("OK", result); | |
expectedException.expect(RedisException.class); | |
expectedException.expectMessage("WRONGTYPE Operation against a key holding the wrong kind of value"); | |
Observable<String> error = RedisObservables.single(() -> connection.hget("key", "dummy")); | |
error.toBlocking().first(); | |
} | |
@Test | |
public void multiFuture() throws Exception { | |
List<String> keys = Arrays.asList("key1", "key2"); | |
Observable<List<String>> multiple = RedisObservables.multiple(() -> keys.stream() | |
.map(key -> connection.set(key, "value-" + key)).collect(toList())); | |
List<String> result = multiple.toBlocking().first(); | |
assertEquals(Arrays.asList("OK", "OK"), result); | |
multiple = RedisObservables.multiple(() -> keys.stream().map(key -> connection.get(key)).collect(toList())); | |
result = multiple.toBlocking().first(); | |
assertEquals(Arrays.asList("value-key1", "value-key2"), result); | |
} | |
@Test | |
public void multiFutureWithError() throws Exception { | |
connection.set("key1", "value").get(); | |
expectedException.expect(RedisException.class); | |
expectedException.expectMessage("WRONGTYPE Operation against a key holding the wrong kind of value"); | |
List<String> keys = Arrays.asList("key1", "key2"); | |
Observable<List<String>> multiple = RedisObservables.multiple(() -> keys.stream() | |
.map(key -> connection.hget(key, "field")).collect(toList())); | |
multiple.toBlocking().first(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment