Skip to content

Instantly share code, notes, and snippets.

@mp911de
Last active August 29, 2015 14:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mp911de/8664c7c7cae3cb4c0092 to your computer and use it in GitHub Desktop.
Save mp911de/8664c7c7cae3cb4c0092 to your computer and use it in GitHub Desktop.
Using RxJava with lettuce (timeout and event executor should be adopted to suit your needs). Thx to @vleushin
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import com.google.common.collect.Lists;
import rx.Observable;
import rx.Subscriber;
import com.lambdaworks.redis.RedisCommandTimeoutException;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.RedisFuture;
/**
* Subscribes on multiple futures.
* Success flow: Subscriber will be notified upon sucess if all futures come back sucessfully before the timeout is exceeded.
* Error flow: Subscriber will be notified for every error or if the timeout is exceeded.
*/
class RedisMultiFutureSubscription<T> implements Observable.OnSubscribe<List<T>> {
private final Executor onFutureCompletedExecutor;
private final Supplier<? extends Collection<RedisFuture<T>>> supplier;
public RedisMultiFutureSubscription(Executor onFutureCompletedExecutor,
Supplier<? extends Collection<RedisFuture<T>>> supplier) {
this.onFutureCompletedExecutor = onFutureCompletedExecutor;
this.supplier = supplier;
}
public void call(Subscriber<? super List<T>> subscriber) {
Collection<RedisFuture<T>> futures = supplier.get();
AtomicInteger counter = new AtomicInteger(futures.size());
AtomicBoolean hadErrors = new AtomicBoolean(false);
Object[] responses = new Object[futures.size()];
int index = 0;
for (RedisFuture<T> future : futures) {
final int position = index++;
future.addListener(() -> {
if (future.getError() != null) {
hadErrors.set(true);
}
if (subscriber.isUnsubscribed()) {
return;
}
if (future.getError() != null) {
subscriber.onError(new RedisException(future.getError()));
} else {
try {
responses[position] = future.get();
} catch (ExecutionException e) {
hadErrors.set(true);
subscriber.onError(e.getCause());
} catch (Exception e) {
hadErrors.set(true);
subscriber.onError(e);
}
if (counter.decrementAndGet() == 0 && !hadErrors.get()) {
subscriber.onNext((List<T>) Lists.newArrayList(responses));
subscriber.onCompleted();
}
}
}, onFutureCompletedExecutor);
}
if (!await(futures, 5, TimeUnit.SECONDS)) {
hadErrors.set(true);
subscriber.onError(new RedisCommandTimeoutException());
}
}
private boolean await(Collection<RedisFuture<T>> futures, int timeout, TimeUnit timeunit) {
long nanos = timeunit.toNanos(timeout);
long time = System.nanoTime();
for (RedisFuture<T> future : futures) {
if (nanos < 0) {
return false;
}
if (!future.await(nanos, TimeUnit.NANOSECONDS)) {
return false;
}
long now = System.nanoTime();
nanos -= now - time;
time = now;
}
return true;
}
}
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Supplier;
import com.lambdaworks.redis.RedisFuture;
import rx.Observable;
/**
* @author <a href="mailto:mpaluch@paluch.biz">Mark Paluch</a>
* @soundtrack Ian Lurgee Presents - AutoSuggestion (May 2015)
*/
public class RedisObservables {
private static final Executor executor = Executors.newSingleThreadExecutor();
/**
* Subscribe to one future.
*/
public static <T> Observable<T> single(Supplier<RedisFuture<? extends T>> supplier) {
return Observable.create(new RedisSingleFutureSubscription<>(executor, supplier));
}
/**
* Subscribe to a collection of futures.
*/
public static <T> Observable<List<T>> multiple(Supplier<Collection<RedisFuture<T>>> supplier) {
return Observable.create(new RedisMultiFutureSubscription<>(executor, supplier));
}
}
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import rx.Observable;
import rx.Subscriber;
import com.lambdaworks.redis.RedisCommandTimeoutException;
import com.lambdaworks.redis.RedisException;
import com.lambdaworks.redis.RedisFuture;
/**
* Subscribes on one particular future.
* Success flow: Subscriber will be notified upon sucess if the future comes back before the timeout is exceeded.
* Error flow: Subscriber will be notified on error or if the timeout exceeded.
*/
class RedisSingleFutureSubscription<T> implements Observable.OnSubscribe<T> {
private final Executor onFutureCompletedExecutor;
private final Supplier<RedisFuture<? extends T>> supplier;
public RedisSingleFutureSubscription(Executor onFutureCompletedExecutor, Supplier<RedisFuture<? extends T>> supplier) {
this.onFutureCompletedExecutor = onFutureCompletedExecutor;
this.supplier = supplier;
}
public void call(Subscriber<? super T> subscriber) {
RedisFuture<? extends T> future = supplier.get();
future.addListener(() -> {
if (subscriber.isUnsubscribed()) {
return;
}
if (future.getError() != null) {
subscriber.onError(new RedisException(future.getError()));
} else {
try {
subscriber.onNext(future.get());
subscriber.onCompleted();
} catch (ExecutionException e) {
subscriber.onError(e.getCause());
} catch (Exception e) {
subscriber.onError(e);
}
}
}, onFutureCompletedExecutor);
if (!future.await(5, TimeUnit.SECONDS)) {
subscriber.onError(new RedisCommandTimeoutException());
}
}
}
import static java.util.stream.Collectors.toList;
import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import rx.Observable;
import com.lambdaworks.redis.RedisAsyncConnection;
import com.lambdaworks.redis.RedisClient;
import com.lambdaworks.redis.RedisException;
/**
* @author <a href="mailto:mpaluch@paluch.biz">Mark Paluch</a>
* @soundtrack Work Out! 048 - with guests Cosmic Gate, hosts Store N Forward
*/
public class SubscriptionTest {
@Rule
public ExpectedException expectedException = ExpectedException.none();
private static RedisClient client;
private RedisAsyncConnection<String, String> connection;
@BeforeClass
public static void setupClient() {
client = new RedisClient("127.0.0.1");
}
@AfterClass
public static void shutdownClient() {
client.shutdown(0, 0, TimeUnit.MILLISECONDS);
}
@Before
public void before() throws Exception {
connection = client.connectAsync();
connection.del("key", "key1", "key2").get();
}
@After
public void after() throws Exception {
connection.close();
connection = null;
}
@Test
public void singleFuture() throws Exception {
Observable<String> single = RedisObservables.single(() -> connection.set("key", "value"));
String result = single.toBlocking().first();
assertEquals("OK", result);
}
@Test
public void singleFutureWithError() throws Exception {
Observable<String> single = RedisObservables.single(() -> connection.set("key", "value"));
String result = single.toBlocking().first();
assertEquals("OK", result);
expectedException.expect(RedisException.class);
expectedException.expectMessage("WRONGTYPE Operation against a key holding the wrong kind of value");
Observable<String> error = RedisObservables.single(() -> connection.hget("key", "dummy"));
error.toBlocking().first();
}
@Test
public void multiFuture() throws Exception {
List<String> keys = Arrays.asList("key1", "key2");
Observable<List<String>> multiple = RedisObservables.multiple(() -> keys.stream()
.map(key -> connection.set(key, "value-" + key)).collect(toList()));
List<String> result = multiple.toBlocking().first();
assertEquals(Arrays.asList("OK", "OK"), result);
multiple = RedisObservables.multiple(() -> keys.stream().map(key -> connection.get(key)).collect(toList()));
result = multiple.toBlocking().first();
assertEquals(Arrays.asList("value-key1", "value-key2"), result);
}
@Test
public void multiFutureWithError() throws Exception {
connection.set("key1", "value").get();
expectedException.expect(RedisException.class);
expectedException.expectMessage("WRONGTYPE Operation against a key holding the wrong kind of value");
List<String> keys = Arrays.asList("key1", "key2");
Observable<List<String>> multiple = RedisObservables.multiple(() -> keys.stream()
.map(key -> connection.hget(key, "field")).collect(toList()));
multiple.toBlocking().first();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment