Skip to content

Instantly share code, notes, and snippets.

@alexdorand
Created April 30, 2015 06:19
Show Gist options
  • Save alexdorand/fe54906bc31c290503c6 to your computer and use it in GitHub Desktop.
Save alexdorand/fe54906bc31c290503c6 to your computer and use it in GitHub Desktop.
Here is my test
package com.folion.activity.dao;
import com.couchbase.client.core.time.Delay;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.document.RawJsonDocument;
import com.couchbase.client.java.error.CASMismatchException;
import com.couchbase.client.java.error.DocumentDoesNotExistException;
import com.couchbase.client.java.util.retry.RetryBuilder;
import com.google.gson.Gson;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func0;
import rx.functions.Func1;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
public class CouchbaseActivityDao {
private static CouchbaseActivityDao instance;
private Gson gson;
private Cluster cluster;
private String bucketName;
private Bucket bucket;
public CouchbaseActivityDao(String[] clusters, String bucketName) {
this.bucketName = bucketName;
cluster = CouchbaseCluster.create(Arrays.asList(clusters));
bucket = cluster.openBucket(bucketName, 1, TimeUnit.DAYS);
gson = new Gson();
}
public static CouchbaseActivityDao getInstance() {
if (instance == null) {
String[] serverURIs = "http://localhost:8091".split(",");
instance = new CouchbaseActivityDao(serverURIs, "geo");
}
return instance;
}
public void test(final Integer var) {
Observable
.just(var)
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer varInt) {
System.out.println("get from the bucket");
return replaceTry(varInt);
}
})
.map(new Func1<Integer, Integer>() {
@Override
public Integer call(Integer callVar) {
System.out.println("found the record and updated it");
return callVar;
}
})
.defaultIfEmpty(createResult(1))
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer v) {
System.out.println("retry to update");
return replaceTry(v);
}
})
.subscribe();
}
public Observable<Integer> replaceTry(final Integer i) {
return Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
if(i==3) {
subscriber.onNext(i + 2);
}
}
});
}
public Integer createResult(final Integer i) {
System.out.println("create");
return i+2;
}
public void casUpdateJson(final String key, final Action action) {
Observable
.just(key)
.flatMap(new Func1<String, Observable<RawJsonDocument>>() {
@Override
public Observable<RawJsonDocument> call(String id) {
System.out.println("get " + id);
return bucket.async().get(id, RawJsonDocument.class);
}
})
.map(new Func1<RawJsonDocument, RawJsonDocument>() {
@Override
public RawJsonDocument call(RawJsonDocument rawJsonDocument) {
System.out.println("update " + rawJsonDocument.content());
// modify your doc here
return rawJsonDocument;
}
})
.defaultIfEmpty(RawJsonDocument.create(key, gson.toJson(action)))
.flatMap(new Func1<RawJsonDocument, Observable<?>>() {
@Override
public Observable<?> call(final RawJsonDocument rawJsonDocument) {
System.out.println("here");
return bucket.async().replace(rawJsonDocument)
// retry when CAS concurrent modifications quickly
.retryWhen(RetryBuilder.anyOf(CASMismatchException.class)
.max(10).delay(Delay.fixed(10, TimeUnit.MICROSECONDS)).build())
.onErrorResumeNext(new Func1<Throwable, Observable<RawJsonDocument>>() {
@Override
public Observable<RawJsonDocument> call(Throwable throwable) {
System.out.println("insert");
// if doc does not exist on replace, fall back to insert
if (throwable instanceof DocumentDoesNotExistException) {
return bucket.async().insert(rawJsonDocument);
}
// if other error, forward it
return Observable.error(throwable);
}
});
}
})
.subscribe();
}
public void runTest() {
Action action = new Action();
action.setName("action-1");
action.setType("action-type");
CouchbaseActivityDao.getInstance().casUpdateJson("test", action);
}
public class Action {
private String name;
private String type;
public void setName(String name) {
this.name = name;
}
public void setType(String type) {
this.type = type;
}
public String getName() {
return name;
}
public String getType() {
return type;
}
}
public static void main(String arg[]) {
CouchbaseActivityDao.getInstance().test(5);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment