Skip to content

Instantly share code, notes, and snippets.

@daschl
Created August 18, 2014 09:05
Show Gist options
  • Save daschl/c03d8916f8fa4290776d to your computer and use it in GitHub Desktop.
Save daschl/c03d8916f8fa4290776d to your computer and use it in GitHub Desktop.
  1. Getting Started ==================

This getting started guide acts as a foundational introduction into the Couchbase Java SDK. You learn how to install it properly and write a basic sample application that highlights important aspects. At the end of this guide you are able to start exploring further documentation on your own or proceed with a tutorial where a full-blown, production grade application is built.

2.a Installation

Currently, the Java SDK is in a beta release state, so you need to include the Couchbase Maven Repository if you want to pull it in automatically. Here is a typical pom.xml that you can copy and paste:

<dependencies>
    <dependency>
        <groupId>com.couchbase.client</groupId>
        <artifactId>couchbase-client</artifactId>
        <version>2.0.0-beta</version>
    </dependency>
</dependencies>

<repositories>
    <repository>
        <id>couchbase</id>
        <name>couchbase repository</name>
        <url>http://files.couchbase.com/maven2</url>
    </repository>
</repositories>

You can also download an archive which has all jars and dependencies included, but in general using a package manager is strongly recommended. Once the final version is released, it will also be distributed through maven central.

If you import the dependency, here is a list of dependencies that will also be added:

- core-io: our internal core library, which abstracts lots of couchbase-specific behavior in a language neutral way.
- rxjava: a foundational library to build reactive and asynchronous applications.

Note that at least Java 6 is required, but any newer version is also supported (including the newly released Java 8).

2.b Hello Couchbase Tutorial

Now that you added the SDK dependency to your project, you can proceed and implement the classic "Hello World" example. Some important aspects are also covered along the way that will help you later.

The first thing you need to do is connect to the cluster:

Cluster cluster = CouchbaseCluster.create();

With no other arguments provided, this will logically bind it to a cluster where at least one node is reachable on localhost. This is a reasonable default to get started, but you can also pass in more/different seed nodes like this:

Cluster cluster = CouchbaseCluster.create("192.168.56.101", "192.168.56.102");

You do not need to pass in all nodes of the cluster, just a few seed nodes so that the client is able to establish initial contact. The actual process of connecting to a bucket (that is, opening sockets and everything related) happens when you call the openBucket method:

Observable<Bucket> bucketObservable = cluster.openBucket();

This will connect to the default bucket and return an Observable<Bucket>. If you are not familiar with asynchronous programming be patient, it will get clear very soon. For now, just think of the Observable as the asynchronous cousin of a Iterable which will eventually reference our Bucket.

At the end of your code you need to shutdown the client to free all resources (sockets, threads,...), otherwise your app keeps running (because some threads are required to be non-daemon threads):

cluster.disconnect().toBlocking().single();

This disconnects all buckets and free associated ressources.

One way to get access to our Bucket object (so that you can perform data operations) is to block the current thread and wait until it is ready to be used:

Bucket bucket = cluster.openBucket().toBlocking().single();

If you are curious: the Observable is converted into a BlockingObservable and the code waits until a single value (our Bucket) is ready to be used. Observables can also contain more than one values, which will come in handy later.

You now have a reference to the Bucket, so what are you waiting for? The SDK comes with built-in handling for JSON documents, which you can utilize right away. First, create a JsonObject which contains information for a user.

JsonObject user = JsonObject.empty()
    .put("firstname", "Walter")
    .put("lastname", "White")
    .put("job", "chemistry teacher")
    .put("age", 50);

A JsonObject works very much like a Map, but it is designed to only let you insert values that can be stored as valid JSON (including nested objects and arrays). The resulting document will look like:

{
	"firstname":"Walter",
	"job":"chemistry teacher",
	"age":50,
	"lastname":"White"
}

To store the document, you can use the upsert method on the bucket. Note that since a Document on the server has more properties than just the content, you need to give it at least a unique document ID (like walter). The container for all this information is called a Document and since you are dealing with JSON you need to create a JsonDocument:

JsonDocument doc = JsonDocument.create("walter", user);
Observable<JsonDocument> response = bucket.upsert(doc);

The Document is automatically converted into JSON and stored on the cluster. If the document (identified by its unique ID) already exists, it is replaced.

If you run the command as shown above, there is a good chance that the Document is not stored on the server. This is because upsert is an asynchronous operation as well and if your main thread doesn't wait for a response and just quits, the SDK doesn't get a chance to actually do its network calls. One way to mitigate this is to use toBlocking().single() again to make your current thread wait on the result:

// Wait until the document is stored
JsonDocument stored = response.toBlocking().single();

// Print out the document ID that got stored
System.out.println("Stored: " + stored.id());

Note that stored is not the same Document which you passed into the upsert method. Once a successful response arrives from the server, a new one is created and populate with original and new values to properly reflect the new document state.

If you replace upsert with insert and try to insert the same document twice (with the same ID), you see the following:

Exception in thread "main" com.couchbase.client.java.error.DocumentAlreadyExistsException
	at com.couchbase.client.java.CouchbaseBucket$12.call(CouchbaseBucket.java:247)
	at com.couchbase.client.java.CouchbaseBucket$12.call(CouchbaseBucket.java:243)
	at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
	at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
	at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:96)
	at com.couchbase.client.core.ResponseHandler.onEvent(ResponseHandler.java:77)
	at com.couchbase.client.core.ResponseHandler.onEvent(ResponseHandler.java:22)
	at com.couchbase.client.deps.com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

This provides two important clues:

- `insert`, unlike `upsert` fails if the document already exists on the server (very much like the SQL `INSERT`
  statement).
- If you block on an `Observable` it can (and will) throw Exceptions in failure cases.
  Error handling is an important part of an application and will be covered extensively throughout the documentation.

Not surprisingly, you can also retreive a Document from the database. You do this by providing its ID:

    Observable<JsonDocument> response = bucket.get("walter");
    JsonDocument walter = response.toBlocking().single();
    System.out.println("Found: " + walter);

This prints:

Found: JsonDocument{id='walter', cas=3077475790214, expiry=0, content={"firstname":"Walter","job":"chemistry teacher","age":50,"lastname":"White"}}

If you want to print only the age, you can reach into the content (much like you would access a Map):

System.out.println("Age: " + walter.content().getInt("age"));

Now that you are an expert in storing and retreiving documents, you can combine both commands to implement something that is needed very often: loading a document, modifying its content and eventually storing the modified document.

First, here is one of the synchronous ways to do it:

String id = "walter";
JsonDocument loaded = bucket.get(id).toBlocking().singleOrDefault(null);
if (loaded == null) {
    System.err.println("Document not found!");
} else {
    loaded.content().put("age", 52);
    JsonDocument updated = bucket.replace(loaded).toBlocking().single();
    System.out.println("Updated: " + updated.id());
}

The code uses the get method to load the Document, wait until it arrives from the server (and return null as the default value if it does not exist). If it is not null, the content is modified and the document is stored again through the replace method. You can think of replace as the opposite to insert: if the document does not already exist, the call will fail.

If single() is used instead of singleOrDefault(), you will see an exception if the document does not exist (you can try that by using a different document ID than walter):

Exception in thread "main" java.util.NoSuchElementException: Sequence contains no elements
	at rx.internal.operators.OperatorSingle$1.onCompleted(OperatorSingle.java:82)
	at rx.internal.operators.OperatorMap$1.onCompleted(OperatorMap.java:43)
	at rx.internal.operators.OperatorFilter$1.onCompleted(OperatorFilter.java:42)
	at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onCompleted(SubjectSubscriptionManager.java:232)
	at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:97)
	at com.couchbase.client.core.ResponseHandler.onEvent(ResponseHandler.java:77)
	at com.couchbase.client.core.ResponseHandler.onEvent(ResponseHandler.java:22)
	at com.couchbase.client.deps.com.lmax.disruptor.BatchEventProcessor.run(BatchEventProcessor.java:128)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Since the Document is not found, the Observable has nothing it can return to you.

The code shown above is completely synchronous, which means that your main thread will wait all the time until a response comes back from the server. It waits for network IO to happen while instead it could perform valuable work. This is where the whole idea of asynchronous application flows gets important. Instead of waiting and "pulling" the data out of the SDK, you can just keep going forward in your application flow and let the SDK notify you once it's done with work.

Observables are not just containers, they provide a large range of methods and functionality to create, combine and transform asynchronous workflows and make them look synchronous (while they aren't). This is important, because other approaches involve "callback hells" and complicated dealing with futures, especially if you want to "chain" more than one asynchronous operation together.

Here is the very same example, but completely non-blocking:

bucket
    .get("walter1")
    .flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
        @Override
        public Observable<JsonDocument> call(final JsonDocument loaded) {
            loaded.content().put("age", 52);
            return bucket.replace(loaded);
        }
    })
    .subscribe(new Action1<JsonDocument>() {
        @Override
        public void call(final JsonDocument updated) {
            System.out.println("Updated: " + updated.id());
        }
    });

Without going too much in-depth about how Observables work, it is not too hard to guess what is happening. The document is loaded through the get method and once it is done, the flatMap method is executed. This method changes the content and calls the replace method. Once the replace is done, the Subscriber is called and the line is printed. Note that we do not need to check for null here because the whole chain is just not executed if the document is not found in the first place.

The style shown above is Java6/7 with anonymous classes. If you are one of the happy ones to already use Java 8, you can replace the same code with lambdas to make it much nicer and succinct:

bucket
    .get("walter1")
    .flatMap(loaded -> {
        loaded.content().put("age", 52);
        return bucket.replace(loaded);
    })
    .subscribe(updated -> System.out.println("Updated: " + updated.id()));

Note that since this flow is asynchronous, every "callback" is run in a specific thread, different from your "main" one. Since this one-off example is different from long-running server applications (which are not shut down after one operation), it is a good idea to synchronize and wait once the last operation is finished.

A naive way is to add a Thread.sleep(1000) after the last call to keep the current thread alive for a second and hope that the operation is done:

bucket
    .get("walter1")
    .flatMap(loaded -> {
        loaded.content().put("age", 52);
        return bucket.replace(loaded);
    })
    .subscribe(updated -> System.out.println("Updated: " + updated.id()));

Thread.sleep(1000);

Most of the time it will take much shorter, so you are just wasting time. And in the unlikely case that it takes longer, it doesn't work properly. So a much better way is the use of a CountDownLatch, which ships with the JDK. One thread counts it down and the other waits until its counted down:

final CountDownLatch latch = new CountDownLatch(1);
bucket
    .get("walter1")
    .flatMap(loaded -> {
        loaded.content().put("age", 52);
        return bucket.replace(loaded);
    })
    .subscribe(updated -> {
        System.out.println("Updated: " + updated.id());
        latch.countDown();
    });

latch.await();

Unsurprisingly, this is also the preferred way on how to deal with asynchronous computation in unit tests and other scenarios where tight control between threads is needed.

Congratulations, you've completed your first steps towards Couchbase masterhood! We recommend you to either proceed with a full-blown tutorial application or browse through the rest of the documentation as you see fit.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment