Skip to content

Instantly share code, notes, and snippets.

@daschl
Created May 11, 2015 09:28
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daschl/4ce7a375a3aefd170b50 to your computer and use it in GitHub Desktop.
Save daschl/4ce7a375a3aefd170b50 to your computer and use it in GitHub Desktop.
Insert, getAndLock, Replace Example
import com.couchbase.client.core.time.Delay;
import com.couchbase.client.java.AsyncBucket;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.error.DocumentAlreadyExistsException;
import rx.Observable;
import rx.functions.Func1;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static com.couchbase.client.java.util.retry.RetryBuilder.anyOf;
public class UpdateExample {
private static final int BATCH_SIZE = 100000;
private static final int LOCK_TIME = 10;
public static void main(String... args) {
// Initialize the Connection and flush the bucket
Cluster cluster = CouchbaseCluster.create();
Bucket bucket = cluster.openBucket();
AsyncBucket asyncBucket = bucket.async();
bucket.bucketManager().flush();
// Create some sample data
List<JsonDocument> demoDocuments = createData(BATCH_SIZE);
// Perform the Insert/Update Cycles in a large batch
long startTime = System.nanoTime();
long succeeded = Observable
.from(demoDocuments)
.flatMap(doc ->
Observable.defer(() -> asyncBucket.insert(doc))
.onErrorResumeNext(new IgnoreDocumentExists())
.flatMap(inserted -> Observable.defer(() -> asyncBucket.getAndLock(inserted, LOCK_TIME)))
.map(new ModifyDocument())
.flatMap(toReplace -> Observable.defer(() -> asyncBucket.replace(toReplace)))
.retryWhen(anyOf(Exception.class)
.delay(Delay.fixed(100, TimeUnit.MICROSECONDS))
.max(10000)
.build()
)
.onErrorResumeNext(new LastResortErrorHandler())
)
.count()
.toBlocking()
.last();
long endTime = System.nanoTime();
System.out.println("Completed batch of " + BATCH_SIZE
+ " with " + succeeded
+ " correct ops in " + TimeUnit.NANOSECONDS.toSeconds(endTime - startTime) + "s");
}
/**
* Create Mock data to insert and modify.
*/
private static List<JsonDocument> createData(int numDocs) {
List<JsonDocument> data = new ArrayList<>();
for (int i = 0; i < numDocs; i++) {
data.add(JsonDocument.create("id::" + i, JsonObject.create().put("initial", true)));
}
return data;
}
/**
* Helper function to ignore if a document exists, but pass through all other errors.
*/
private static class IgnoreDocumentExists
implements Func1<Throwable, Observable<? extends JsonDocument>> {
@Override
public Observable<? extends JsonDocument> call(Throwable error) {
if (error instanceof DocumentAlreadyExistsException) {
return Observable.empty();
}
return Observable.error(error);
}
}
/**
* Demo class to modify the document somehow.
*/
private static class ModifyDocument implements Func1<JsonDocument, JsonDocument> {
@Override
public JsonDocument call(JsonDocument doc) {
doc.content().put("initial", false).put("modified", true);
return doc;
}
}
/**
* Error handler which logs the error and gives the rest of the batch a chance to complete.
*/
private static class LastResortErrorHandler
implements Func1<Throwable, Observable<? extends JsonDocument>> {
@Override
public Observable<? extends JsonDocument> call(Throwable throwable) {
System.err.println("Could not complete flow because of: " + throwable.getMessage());
throwable.printStackTrace();
return Observable.empty();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment