Skip to content

Instantly share code, notes, and snippets.

@alexdorand
Created April 30, 2015 04:51
Show Gist options
  • Save alexdorand/809eee9dce807932a46f to your computer and use it in GitHub Desktop.
Save alexdorand/809eee9dce807932a46f to your computer and use it in GitHub Desktop.
Here is the class. You need to create a bucket called "geo" for this to work
package com.folion.activity.dao;
import com.couchbase.client.core.time.Delay;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.document.RawJsonDocument;
import com.couchbase.client.java.error.CASMismatchException;
import com.couchbase.client.java.error.DocumentDoesNotExistException;
import com.couchbase.client.java.util.retry.RetryBuilder;
import com.folion.couchbase.service.BucketNotFoundException;
import com.google.gson.Gson;
import rx.Observable;
import rx.functions.Func1;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
public class CouchbaseActivityDao {
private static CouchbaseActivityDao instance;
private Gson gson;
private Cluster cluster;
private String bucketName;
private Bucket bucket;
public CouchbaseActivityDao(String[] clusters, String bucketName) {
this.bucketName = bucketName;
cluster = CouchbaseCluster.create(Arrays.asList(clusters));
bucket = cluster.openBucket(bucketName, 1, TimeUnit.DAYS);
gson = new Gson();
}
public static CouchbaseActivityDao getInstance() {
if (instance == null) {
String[] serverURIs = "http://localhost:8091".split(",");
instance = new CouchbaseActivityDao(serverURIs, "geo");
}
return instance;
}
public void casUpdateJson(final String key, final Action action) {
if (bucket == null) {
throw new BucketNotFoundException("Bucket " + bucketName + " is not registered");
}
Observable
.just(key)
.flatMap(new Func1<String, Observable<RawJsonDocument>>() {
@Override
public Observable<RawJsonDocument> call(String id) {
System.out.println("get " + id);
return bucket.async().get(id, RawJsonDocument.class);
}
})
.map(new Func1<RawJsonDocument, RawJsonDocument>() {
@Override
public RawJsonDocument call(RawJsonDocument rawJsonDocument) {
System.out.println("update " + rawJsonDocument.content());
// modify your doc here
return rawJsonDocument;
}
})
.defaultIfEmpty(RawJsonDocument.create(key, gson.toJson(action)))
.flatMap(new Func1<RawJsonDocument, Observable<?>>() {
@Override
public Observable<?> call(final RawJsonDocument rawJsonDocument) {
System.out.println("here");
return bucket.async().replace(rawJsonDocument)
// retry when CAS concurrent modifications quickly
.retryWhen(RetryBuilder.anyOf(CASMismatchException.class)
.max(10).delay(Delay.fixed(10, TimeUnit.MICROSECONDS)).build())
.onErrorResumeNext(new Func1<Throwable, Observable<RawJsonDocument>>() {
@Override
public Observable<RawJsonDocument> call(Throwable throwable) {
System.out.println("insert");
// if doc does not exist on replace, fall back to insert
if (throwable instanceof DocumentDoesNotExistException) {
return bucket.async().insert(rawJsonDocument);
}
// if other error, forward it
return Observable.error(throwable);
}
});
}
})
.subscribe();
}
public void runTest() {
Action action = new Action();
action.setName("action-1");
action.setType("action-type");
CouchbaseActivityDao.getInstance().casUpdateJson("test", action);
}
public class Action {
private String name;
private String type;
public void setName(String name) {
this.name = name;
}
public void setType(String type) {
this.type = type;
}
public String getName() {
return name;
}
public String getType() {
return type;
}
}
public static void main(String arg[]) {
CouchbaseActivityDao.getInstance().runTest();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment