Created
May 11, 2015 09:28
-
-
Save daschl/4ce7a375a3aefd170b50 to your computer and use it in GitHub Desktop.
Insert, getAndLock, Replace Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.couchbase.client.core.time.Delay; | |
import com.couchbase.client.java.AsyncBucket; | |
import com.couchbase.client.java.Bucket; | |
import com.couchbase.client.java.Cluster; | |
import com.couchbase.client.java.CouchbaseCluster; | |
import com.couchbase.client.java.document.JsonDocument; | |
import com.couchbase.client.java.document.json.JsonObject; | |
import com.couchbase.client.java.error.DocumentAlreadyExistsException; | |
import rx.Observable; | |
import rx.functions.Func1; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.TimeUnit; | |
import static com.couchbase.client.java.util.retry.RetryBuilder.anyOf; | |
public class UpdateExample { | |
private static final int BATCH_SIZE = 100000; | |
private static final int LOCK_TIME = 10; | |
public static void main(String... args) { | |
// Initialize the Connection and flush the bucket | |
Cluster cluster = CouchbaseCluster.create(); | |
Bucket bucket = cluster.openBucket(); | |
AsyncBucket asyncBucket = bucket.async(); | |
bucket.bucketManager().flush(); | |
// Create some sample data | |
List<JsonDocument> demoDocuments = createData(BATCH_SIZE); | |
// Perform the Insert/Update Cycles in a large batch | |
long startTime = System.nanoTime(); | |
long succeeded = Observable | |
.from(demoDocuments) | |
.flatMap(doc -> | |
Observable.defer(() -> asyncBucket.insert(doc)) | |
.onErrorResumeNext(new IgnoreDocumentExists()) | |
.flatMap(inserted -> Observable.defer(() -> asyncBucket.getAndLock(inserted, LOCK_TIME))) | |
.map(new ModifyDocument()) | |
.flatMap(toReplace -> Observable.defer(() -> asyncBucket.replace(toReplace))) | |
.retryWhen(anyOf(Exception.class) | |
.delay(Delay.fixed(100, TimeUnit.MICROSECONDS)) | |
.max(10000) | |
.build() | |
) | |
.onErrorResumeNext(new LastResortErrorHandler()) | |
) | |
.count() | |
.toBlocking() | |
.last(); | |
long endTime = System.nanoTime(); | |
System.out.println("Completed batch of " + BATCH_SIZE | |
+ " with " + succeeded | |
+ " correct ops in " + TimeUnit.NANOSECONDS.toSeconds(endTime - startTime) + "s"); | |
} | |
/** | |
* Create Mock data to insert and modify. | |
*/ | |
private static List<JsonDocument> createData(int numDocs) { | |
List<JsonDocument> data = new ArrayList<>(); | |
for (int i = 0; i < numDocs; i++) { | |
data.add(JsonDocument.create("id::" + i, JsonObject.create().put("initial", true))); | |
} | |
return data; | |
} | |
/** | |
* Helper function to ignore if a document exists, but pass through all other errors. | |
*/ | |
private static class IgnoreDocumentExists | |
implements Func1<Throwable, Observable<? extends JsonDocument>> { | |
@Override | |
public Observable<? extends JsonDocument> call(Throwable error) { | |
if (error instanceof DocumentAlreadyExistsException) { | |
return Observable.empty(); | |
} | |
return Observable.error(error); | |
} | |
} | |
/** | |
* Demo class to modify the document somehow. | |
*/ | |
private static class ModifyDocument implements Func1<JsonDocument, JsonDocument> { | |
@Override | |
public JsonDocument call(JsonDocument doc) { | |
doc.content().put("initial", false).put("modified", true); | |
return doc; | |
} | |
} | |
/** | |
* Error handler which logs the error and gives the rest of the batch a chance to complete. | |
*/ | |
private static class LastResortErrorHandler | |
implements Func1<Throwable, Observable<? extends JsonDocument>> { | |
@Override | |
public Observable<? extends JsonDocument> call(Throwable throwable) { | |
System.err.println("Could not complete flow because of: " + throwable.getMessage()); | |
throwable.printStackTrace(); | |
return Observable.empty(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment