Skip to content

Instantly share code, notes, and snippets.

@daschl
Created September 9, 2014 02:32
Show Gist options
  • Save daschl/86ddee7e3be795c2c2f7 to your computer and use it in GitHub Desktop.
Save daschl/86ddee7e3be795c2c2f7 to your computer and use it in GitHub Desktop.
/**
* Copyright (C) 2014 Couchbase, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALING
* IN THE SOFTWARE.
*/
package com.couchbase.client.java;
import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.java.cluster.AsyncClusterManager;
import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.transcoder.Transcoder;
import rx.Observable;
import java.util.List;
/**
* Represents a Couchbase Server {@link AsyncCluster}.
*
* A {@link AsyncCluster} is able to open many {@link AsyncBucket}s while sharing the underlying resources (like sockets)
* very efficiently. In addition, a {@link AsyncClusterManager} is available to perform cluster-wide operations.
*
* @author Michael Nitschinger
* @since 2.0
*/
public interface AsyncCluster {
/**
* Open the default {@link AsyncBucket}.
*
* @return a {@link Observable} containing the {@link AsyncBucket} reference once opened.
*/
Observable<AsyncBucket> openBucket();
/**
* Open the given {@link AsyncBucket} without a password (if not set during creation).
*
* @param name the name of the bucket.
* @return a {@link Observable} containing the {@link AsyncBucket} reference once opened.
*/
Observable<AsyncBucket> openBucket(String name);
/**
* Open the given {@link AsyncBucket} with a password (set during creation).
*
* @param name the name of the bucket.
* @param password the password of the bucket, can be an empty string.
* @return a {@link Observable} containing the {@link AsyncBucket} reference once opened.
*/
Observable<AsyncBucket> openBucket(String name, String password);
/**
* Open the given {@link AsyncBucket} with a password and a custom list of transcoders.
*
* @param name the name of the bucket.
* @param password the password of the bucket, can be an empty string.
* @param transcoders a list of custom transcoders.
* @return a {@link Observable} containing the {@link AsyncBucket} reference once opened.
*/
Observable<AsyncBucket> openBucket(String name, String password, List<Transcoder<? extends Document, ?>> transcoders);
/**
* Returns a reference to the {@link AsyncClusterManager}.
*
* The {@link AsyncClusterManager} allows to perform cluster level management operations. It requires administrative
* credentials, which have been set during cluster configuration. Bucket level credentials are not enough to perform
* cluster-level operations.
*
* @param username privileged username.
* @param password privileged password.
* @return a {@link Observable} containing the {@link AsyncClusterManager}.
*/
Observable<AsyncClusterManager> clusterManager(String username, String password);
/**
* Disconnects from the {@link AsyncCluster} and closes all open {@link AsyncBucket}s.
*
* @return a {@link Observable} containing true if successful and failing the {@link Observable} otherwise.
*/
Observable<Boolean> disconnect();
/**
* Returns a reference to the underlying core engine.
*
* Since the {@link ClusterFacade} provides direct access to low-level semantics, no sanity checks are performed as
* with the Java SDK itself. Handle with care and only use it when absolutely needed.
*
* @return a {@link Observable} containing the core engine.
*/
Observable<ClusterFacade> core();
}
/**
* Copyright (C) 2014 Couchbase, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALING
* IN THE SOFTWARE.
*/
package com.couchbase.client.java;
import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.core.CouchbaseCore;
import com.couchbase.client.core.message.CouchbaseResponse;
import com.couchbase.client.core.message.cluster.DisconnectRequest;
import com.couchbase.client.core.message.cluster.DisconnectResponse;
import com.couchbase.client.core.message.cluster.OpenBucketRequest;
import com.couchbase.client.core.message.cluster.SeedNodesRequest;
import com.couchbase.client.java.cluster.AsyncClusterManager;
import com.couchbase.client.java.cluster.AsyncCouchbaseClusterManager;
import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
import com.couchbase.client.java.transcoder.Transcoder;
import rx.Observable;
import rx.functions.Func1;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public class AsyncCouchbaseCluster implements AsyncCluster {
private static final String DEFAULT_BUCKET = "default";
private static final String DEFAULT_HOST = "127.0.0.1";
private final ClusterFacade core;
private final CouchbaseEnvironment environment;
private final ConnectionString connectionString;
private final boolean sharedEnvironment;
public static AsyncCouchbaseCluster create() {
return create(DEFAULT_HOST);
}
public static AsyncCouchbaseCluster create(final CouchbaseEnvironment environment) {
return create(environment, DEFAULT_HOST);
}
public static AsyncCouchbaseCluster create(final String... nodes) {
return create(Arrays.asList(nodes));
}
public static AsyncCouchbaseCluster create(final List<String> nodes) {
return new AsyncCouchbaseCluster(DefaultCouchbaseEnvironment.create(), ConnectionString.fromHostnames(nodes), false);
}
public static AsyncCouchbaseCluster create(final CouchbaseEnvironment environment, final String... nodes) {
return create(environment, Arrays.asList(nodes));
}
public static AsyncCouchbaseCluster create(final CouchbaseEnvironment environment, final List<String> nodes) {
return new AsyncCouchbaseCluster(environment, ConnectionString.fromHostnames(nodes), true);
}
public static AsyncCouchbaseCluster fromConnectionString(final String connectionString) {
return new AsyncCouchbaseCluster(DefaultCouchbaseEnvironment.create(), ConnectionString.create(connectionString), false);
}
public static AsyncCouchbaseCluster fromConnectionString(final CouchbaseEnvironment environment, final String connectionString) {
return new AsyncCouchbaseCluster(environment, ConnectionString.create(connectionString), true);
}
AsyncCouchbaseCluster(final CouchbaseEnvironment environment, final ConnectionString connectionString, final boolean sharedEnvironment) {
this.sharedEnvironment = sharedEnvironment;
core = new CouchbaseCore(environment);
List<String> seedNodes = new ArrayList<String>();
for (InetSocketAddress node : connectionString.hosts()) {
seedNodes.add(node.getHostName());
}
if (seedNodes.isEmpty()) {
seedNodes.add(DEFAULT_HOST);
}
SeedNodesRequest request = new SeedNodesRequest(seedNodes);
core.send(request).toBlocking().single();
this.environment = environment;
this.connectionString = connectionString;
}
@Override
public Observable<AsyncBucket> openBucket() {
return openBucket(DEFAULT_BUCKET);
}
@Override
public Observable<AsyncBucket> openBucket(final String name) {
return openBucket(name, null);
}
@Override
public Observable<AsyncBucket> openBucket(final String name, final String pass) {
return openBucket(name, pass, null);
}
@Override
public Observable<AsyncBucket> openBucket(final String name, String pass,
final List<Transcoder<? extends Document, ?>> transcoders) {
final String password = pass == null ? "" : pass;
final List<Transcoder<? extends Document, ?>> trans = transcoders == null
? new ArrayList<Transcoder<? extends Document, ?>>() : transcoders;
return core
.send(new OpenBucketRequest(name, password))
.map(new Func1<CouchbaseResponse, AsyncBucket>() {
@Override
public AsyncBucket call(CouchbaseResponse response) {
return new AsyncCouchbaseBucket(core, name, password, trans);
}
});
}
@Override
public Observable<Boolean> disconnect() {
return core
.<DisconnectResponse>send(new DisconnectRequest())
.flatMap(new Func1<DisconnectResponse, Observable<Boolean>>() {
@Override
public Observable<Boolean> call(DisconnectResponse disconnectResponse) {
return sharedEnvironment ? Observable.just(true) : environment.shutdown();
}
});
}
@Override
public Observable<AsyncClusterManager> clusterManager(final String username, final String password) {
return Observable.just((AsyncClusterManager) AsyncCouchbaseClusterManager.create(username, password, connectionString,
environment, core));
}
@Override
public Observable<ClusterFacade> core() {
return Observable.just(core);
}
}
/**
* Copyright (C) 2014 Couchbase, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALING
* IN THE SOFTWARE.
*/
package com.couchbase.client.java;
import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.java.cluster.AsyncClusterManager;
import com.couchbase.client.java.cluster.ClusterManager;
import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.transcoder.Transcoder;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* The {@link Cluster} provides synchronous cluster level access and acts as the entry point
* for all Couchbase Server related operations.
*
* @author Michael Nitschinger
* @since 2.0
*/
public interface Cluster {
/**
* Returns the underlying {@link AsyncCluster} object.
*
* This method can be used to switch from a synchronous to an asynchronous interface.
* @return the {@link AsyncCluster}.
*/
AsyncCluster async();
/**
* Opens the *default* bucket with _no_ password and the default connect timeout.
*
* @return the opened {@link Bucket}.
*/
Bucket openBucket();
/**
* Opens the *default* bucket with _no_ password and a custom connect timeout.
*
* @return the opened {@link Bucket}.
*/
Bucket openBucket(long timeout, TimeUnit timeUnit);
/**
* Opens the bucket with the given name, _no_ password and the default connect timeout.
*
* @return the opened {@link Bucket}.
*/
Bucket openBucket(String name);
/**
* Opens the bucket with the given name, _no_ password and a custom connect timeout.
*
* @return the opened {@link Bucket}.
*/
Bucket openBucket(String name, long timeout, TimeUnit timeUnit);
/**
* Opens the bucket with the given name, password and the default connect timeout.
*
* @return the opened {@link Bucket}.
*/
Bucket openBucket(String name, String password);
/**
* Opens the bucket with the given name, password and a custom connect timeout.
*
* @return the opened {@link Bucket}.
*/
Bucket openBucket(String name, String password, long timeout, TimeUnit timeUnit);
/**
* Opens the bucket with the given name, password, a custom list of {@link Transcoder}s and the default connect
* timeout..
*
* Every custom {@link Transcoder} will either provide new functionality or override a default one provided
* for the specific {@link Document} type.
*
* @return the opened {@link Bucket}.
*/
Bucket openBucket(String name, String password, List<Transcoder<? extends Document, ?>> transcoders);
/**
* Opens the bucket with the given name, password, a custom list of {@link Transcoder}s and a custom connect
* timeout..
*
* Every custom {@link Transcoder} will either provide new functionality or override a default one provided
* for the specific {@link Document} type.
*
* @return the opened {@link Bucket}.
*/
Bucket openBucket(String name, String password, List<Transcoder<? extends Document, ?>> transcoders,
long timeout, TimeUnit timeUnit);
/**
* Disconnects all open buckets and also shuts down all exclusively owned resources with the default disconnect
* timeout.
*
* @return true if the disconnect succeeded, false otherwise.
*/
boolean disconnect();
/**
* Disconnects all open buckets and also shuts down all exclusively owned resources with a custom disconnect
* timeout.
*
* @return true if the disconnect succeeded, false otherwise.
*/
boolean disconnect(long timeout, TimeUnit timeUnit);
/**
* Opens the {@link AsyncClusterManager} for cluster-wide management operations.
*
* The username and password provided here are not the bucket name or password, but those which allow
* cluster-wide operational access (most commonly the system wide administrator or similar level credentials).
*
* @param username the authorized username.
* @param password the password of the username.
* @return the opened {@link AsyncClusterManager}.
*/
ClusterManager clusterManager(String username, String password);
ClusterManager clusterManager(String username, String password, long timeout, TimeUnit timeUnit);
/**
* Provides direct access to the "core" engine and should be handled with care.
*
* @return the {@link ClusterFacade} to the core engine.
*/
ClusterFacade core();
}
package com.couchbase.client.java;
import com.couchbase.client.core.ClusterFacade;
import com.couchbase.client.java.cluster.AsyncClusterManager;
import com.couchbase.client.java.cluster.ClusterManager;
import com.couchbase.client.java.cluster.CouchbaseClusterManager;
import com.couchbase.client.java.document.Document;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
import com.couchbase.client.java.transcoder.Transcoder;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.functions.Func1;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class CouchbaseCluster implements Cluster {
private static final String DEFAULT_BUCKET = "default";
private static final String DEFAULT_HOST = "127.0.0.1";
private final AsyncCouchbaseCluster asyncCluster;
private final List<Bucket> openBuckets;
private final CouchbaseEnvironment environment;
public static CouchbaseCluster create() {
return create(DEFAULT_HOST);
}
public static CouchbaseCluster create(final CouchbaseEnvironment environment) {
return create(environment, DEFAULT_HOST);
}
public static CouchbaseCluster create(final String... nodes) {
return create(Arrays.asList(nodes));
}
public static CouchbaseCluster create(final List<String> nodes) {
return new CouchbaseCluster(DefaultCouchbaseEnvironment.create(), ConnectionString.fromHostnames(nodes), false);
}
public static CouchbaseCluster create(final CouchbaseEnvironment environment, final String... nodes) {
return create(environment, Arrays.asList(nodes));
}
public static CouchbaseCluster create(final CouchbaseEnvironment environment, final List<String> nodes) {
return new CouchbaseCluster(environment, ConnectionString.fromHostnames(nodes), true);
}
public static CouchbaseCluster fromConnectionString(final String connectionString) {
return new CouchbaseCluster(DefaultCouchbaseEnvironment.create(), ConnectionString.create(connectionString), false);
}
public static CouchbaseCluster fromConnectionString(final CouchbaseEnvironment environment, final String connectionString) {
return new CouchbaseCluster(environment, ConnectionString.create(connectionString), true);
}
CouchbaseCluster(final CouchbaseEnvironment environment, final ConnectionString connectionString, final boolean sharedEnvironment) {
this.asyncCluster = new AsyncCouchbaseCluster(environment, connectionString, sharedEnvironment);
this.openBuckets = Collections.synchronizedList(new ArrayList<Bucket>());
this.environment = environment;
}
@Override
public AsyncCluster async() {
return asyncCluster;
}
@Override
public Bucket openBucket() {
return openBucket(DEFAULT_BUCKET);
}
@Override
public Bucket openBucket(final String name) {
return openBucket(name, null);
}
@Override
public Bucket openBucket(final String name, final String password) {
return openBucket(name, password, null);
}
@Override
public Bucket openBucket(final String name, final String password,
final List<Transcoder<? extends Document, ?>> transcoders) {
return openBucket(name, password, transcoders, environment.connectTimeout(), TimeUnit.MILLISECONDS);
}
@Override
public boolean disconnect() {
return disconnect(environment.disconnectTimeout(), TimeUnit.MILLISECONDS);
}
@Override
public ClusterManager clusterManager(final String username, final String password) {
return clusterManager(username, password, environment.connectTimeout(), TimeUnit.MILLISECONDS);
}
@Override
public ClusterFacade core() {
return asyncCluster.core().toBlocking().single();
}
@Override
public Bucket openBucket(long timeout, TimeUnit timeUnit) {
return openBucket(DEFAULT_BUCKET, timeout, timeUnit);
}
@Override
public Bucket openBucket(String name, long timeout, TimeUnit timeUnit) {
return openBucket(name, null, timeout, timeUnit);
}
@Override
public Bucket openBucket(String name, String password, long timeout, TimeUnit timeUnit) {
return openBucket(name, password, null, timeout, timeUnit);
}
@Override
public Bucket openBucket(String name, String password, List<Transcoder<? extends Document, ?>> transcoders, long timeout, TimeUnit timeUnit) {
return asyncCluster
.openBucket(name, password, transcoders)
.map(new Func1<AsyncBucket, Bucket>() {
@Override
public Bucket call(AsyncBucket asyncBucket) {
return new CouchbaseBucket(asyncBucket);
}
})
.doOnNext(new Action1<Bucket>() {
@Override
public void call(Bucket bucket) {
openBuckets.add(bucket);
}
})
.timeout(timeout, timeUnit)
.toBlocking()
.single();
}
@Override
public boolean disconnect(long timeout, TimeUnit timeUnit) {
return asyncCluster
.disconnect()
.doOnCompleted(new Action0() {
@Override
public void call() {
openBuckets.clear();
}
})
.timeout(timeout, timeUnit)
.toBlocking()
.single();
}
@Override
public ClusterManager clusterManager(String username, String password, long timeout, TimeUnit timeUnit) {
return asyncCluster
.clusterManager(username, password)
.map(new Func1<AsyncClusterManager, ClusterManager>() {
@Override
public ClusterManager call(AsyncClusterManager asyncClusterManager) {
return new CouchbaseClusterManager();
}
})
.timeout(timeout, timeUnit)
.toBlocking()
.single();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment