Skip to content

Instantly share code, notes, and snippets.

@nguyenvanthan
Last active March 17, 2016 14:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nguyenvanthan/a8ad69963f798502b790 to your computer and use it in GitHub Desktop.
Save nguyenvanthan/a8ad69963f798502b790 to your computer and use it in GitHub Desktop.
// create the connection
DefaultCouchbaseEnvironment environment = DefaultCouchbaseEnvironment.builder()
.queryEnabled(false)
.reconnectDelay(Delay.linear(TimeUnit.SECONDS, TimeUnit.HOURS.toSeconds(12), 10, 10))
.retryStrategy(FailFastRetryStrategy.INSTANCE)
.connectTimeout(TimeUnit.SECONDS.toMillis(30))
.socketConnectTimeout((int) TimeUnit.SECONDS.toMillis(30))
.build();
// in the 'main' method
String profile = readValue(DATACENTER.EUROPE.name(), id)
.filter(document -> document != null && document.content() instanceof byte[])
.map(document -> {
byte[] data = (byte[]) document.content();
BsonBinaryReader reader = new BsonBinaryReader(ByteBuffer.wrap(data));
DocumentCodec codec = new DocumentCodec();
org.bson.Document decodedValue = codec.decode(reader, DecoderContext.builder().build());
return decodedValue.toJson();
})
.toBlocking()
.singleOrDefault("no profile found");
// read the profile
private Observable<LegacyDocument> readValue(String datacenter, String id) {
final Bucket myBucket = getBucket(datacenter);
return myBucket
.async()
.get(id, LegacyDocument.class)
.timeout(readTimeoutInMs, TimeUnit.MILLISECONDS)
.onErrorResumeNext(throwable -> {
Throwable cause = throwable.getCause();
if (cause instanceof TimeoutException || cause instanceof TemporaryFailureException) {
return profileBucket
.async()
.getFromReplica(id, ReplicaMode.FIRST, LegacyDocument.class)
.timeout(readTimeoutInMs, TimeUnit.MILLISECONDS);
}
return Observable.error(throwable);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment