Created
September 9, 2014 02:32
-
-
Save daschl/86ddee7e3be795c2c2f7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright (C) 2014 Couchbase, Inc. | |
* | |
* Permission is hereby granted, free of charge, to any person obtaining a copy | |
* of this software and associated documentation files (the "Software"), to deal | |
* in the Software without restriction, including without limitation the rights | |
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
* copies of the Software, and to permit persons to whom the Software is | |
* furnished to do so, subject to the following conditions: | |
* | |
* The above copyright notice and this permission notice shall be included in | |
* all copies or substantial portions of the Software. | |
* | |
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING | |
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALING | |
* IN THE SOFTWARE. | |
*/ | |
package com.couchbase.client.java; | |
import com.couchbase.client.core.ClusterFacade; | |
import com.couchbase.client.java.cluster.AsyncClusterManager; | |
import com.couchbase.client.java.document.Document; | |
import com.couchbase.client.java.transcoder.Transcoder; | |
import rx.Observable; | |
import java.util.List; | |
/** | |
* Represents a Couchbase Server {@link AsyncCluster}. | |
* | |
* A {@link AsyncCluster} is able to open many {@link AsyncBucket}s while sharing the underlying resources (like sockets) | |
* very efficiently. In addition, a {@link AsyncClusterManager} is available to perform cluster-wide operations. | |
* | |
* @author Michael Nitschinger | |
* @since 2.0 | |
*/ | |
public interface AsyncCluster { | |
/** | |
* Open the default {@link AsyncBucket}. | |
* | |
* @return a {@link Observable} containing the {@link AsyncBucket} reference once opened. | |
*/ | |
Observable<AsyncBucket> openBucket(); | |
/** | |
* Open the given {@link AsyncBucket} without a password (if not set during creation). | |
* | |
* @param name the name of the bucket. | |
* @return a {@link Observable} containing the {@link AsyncBucket} reference once opened. | |
*/ | |
Observable<AsyncBucket> openBucket(String name); | |
/** | |
* Open the given {@link AsyncBucket} with a password (set during creation). | |
* | |
* @param name the name of the bucket. | |
* @param password the password of the bucket, can be an empty string. | |
* @return a {@link Observable} containing the {@link AsyncBucket} reference once opened. | |
*/ | |
Observable<AsyncBucket> openBucket(String name, String password); | |
/** | |
* Open the given {@link AsyncBucket} with a password and a custom list of transcoders. | |
* | |
* @param name the name of the bucket. | |
* @param password the password of the bucket, can be an empty string. | |
* @param transcoders a list of custom transcoders. | |
* @return a {@link Observable} containing the {@link AsyncBucket} reference once opened. | |
*/ | |
Observable<AsyncBucket> openBucket(String name, String password, List<Transcoder<? extends Document, ?>> transcoders); | |
/** | |
* Returns a reference to the {@link AsyncClusterManager}. | |
* | |
* The {@link AsyncClusterManager} allows to perform cluster level management operations. It requires administrative | |
* credentials, which have been set during cluster configuration. Bucket level credentials are not enough to perform | |
* cluster-level operations. | |
* | |
* @param username privileged username. | |
* @param password privileged password. | |
* @return a {@link Observable} containing the {@link AsyncClusterManager}. | |
*/ | |
Observable<AsyncClusterManager> clusterManager(String username, String password); | |
/** | |
* Disconnects from the {@link AsyncCluster} and closes all open {@link AsyncBucket}s. | |
* | |
* @return a {@link Observable} containing true if successful and failing the {@link Observable} otherwise. | |
*/ | |
Observable<Boolean> disconnect(); | |
/** | |
* Returns a reference to the underlying core engine. | |
* | |
* Since the {@link ClusterFacade} provides direct access to low-level semantics, no sanity checks are performed as | |
* with the Java SDK itself. Handle with care and only use it when absolutely needed. | |
* | |
* @return a {@link Observable} containing the core engine. | |
*/ | |
Observable<ClusterFacade> core(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright (C) 2014 Couchbase, Inc. | |
* | |
* Permission is hereby granted, free of charge, to any person obtaining a copy | |
* of this software and associated documentation files (the "Software"), to deal | |
* in the Software without restriction, including without limitation the rights | |
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
* copies of the Software, and to permit persons to whom the Software is | |
* furnished to do so, subject to the following conditions: | |
* | |
* The above copyright notice and this permission notice shall be included in | |
* all copies or substantial portions of the Software. | |
* | |
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING | |
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALING | |
* IN THE SOFTWARE. | |
*/ | |
package com.couchbase.client.java; | |
import com.couchbase.client.core.ClusterFacade; | |
import com.couchbase.client.core.CouchbaseCore; | |
import com.couchbase.client.core.message.CouchbaseResponse; | |
import com.couchbase.client.core.message.cluster.DisconnectRequest; | |
import com.couchbase.client.core.message.cluster.DisconnectResponse; | |
import com.couchbase.client.core.message.cluster.OpenBucketRequest; | |
import com.couchbase.client.core.message.cluster.SeedNodesRequest; | |
import com.couchbase.client.java.cluster.AsyncClusterManager; | |
import com.couchbase.client.java.cluster.AsyncCouchbaseClusterManager; | |
import com.couchbase.client.java.document.Document; | |
import com.couchbase.client.java.env.CouchbaseEnvironment; | |
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment; | |
import com.couchbase.client.java.transcoder.Transcoder; | |
import rx.Observable; | |
import rx.functions.Func1; | |
import java.net.InetSocketAddress; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.List; | |
public class AsyncCouchbaseCluster implements AsyncCluster { | |
private static final String DEFAULT_BUCKET = "default"; | |
private static final String DEFAULT_HOST = "127.0.0.1"; | |
private final ClusterFacade core; | |
private final CouchbaseEnvironment environment; | |
private final ConnectionString connectionString; | |
private final boolean sharedEnvironment; | |
public static AsyncCouchbaseCluster create() { | |
return create(DEFAULT_HOST); | |
} | |
public static AsyncCouchbaseCluster create(final CouchbaseEnvironment environment) { | |
return create(environment, DEFAULT_HOST); | |
} | |
public static AsyncCouchbaseCluster create(final String... nodes) { | |
return create(Arrays.asList(nodes)); | |
} | |
public static AsyncCouchbaseCluster create(final List<String> nodes) { | |
return new AsyncCouchbaseCluster(DefaultCouchbaseEnvironment.create(), ConnectionString.fromHostnames(nodes), false); | |
} | |
public static AsyncCouchbaseCluster create(final CouchbaseEnvironment environment, final String... nodes) { | |
return create(environment, Arrays.asList(nodes)); | |
} | |
public static AsyncCouchbaseCluster create(final CouchbaseEnvironment environment, final List<String> nodes) { | |
return new AsyncCouchbaseCluster(environment, ConnectionString.fromHostnames(nodes), true); | |
} | |
public static AsyncCouchbaseCluster fromConnectionString(final String connectionString) { | |
return new AsyncCouchbaseCluster(DefaultCouchbaseEnvironment.create(), ConnectionString.create(connectionString), false); | |
} | |
public static AsyncCouchbaseCluster fromConnectionString(final CouchbaseEnvironment environment, final String connectionString) { | |
return new AsyncCouchbaseCluster(environment, ConnectionString.create(connectionString), true); | |
} | |
AsyncCouchbaseCluster(final CouchbaseEnvironment environment, final ConnectionString connectionString, final boolean sharedEnvironment) { | |
this.sharedEnvironment = sharedEnvironment; | |
core = new CouchbaseCore(environment); | |
List<String> seedNodes = new ArrayList<String>(); | |
for (InetSocketAddress node : connectionString.hosts()) { | |
seedNodes.add(node.getHostName()); | |
} | |
if (seedNodes.isEmpty()) { | |
seedNodes.add(DEFAULT_HOST); | |
} | |
SeedNodesRequest request = new SeedNodesRequest(seedNodes); | |
core.send(request).toBlocking().single(); | |
this.environment = environment; | |
this.connectionString = connectionString; | |
} | |
@Override | |
public Observable<AsyncBucket> openBucket() { | |
return openBucket(DEFAULT_BUCKET); | |
} | |
@Override | |
public Observable<AsyncBucket> openBucket(final String name) { | |
return openBucket(name, null); | |
} | |
@Override | |
public Observable<AsyncBucket> openBucket(final String name, final String pass) { | |
return openBucket(name, pass, null); | |
} | |
@Override | |
public Observable<AsyncBucket> openBucket(final String name, String pass, | |
final List<Transcoder<? extends Document, ?>> transcoders) { | |
final String password = pass == null ? "" : pass; | |
final List<Transcoder<? extends Document, ?>> trans = transcoders == null | |
? new ArrayList<Transcoder<? extends Document, ?>>() : transcoders; | |
return core | |
.send(new OpenBucketRequest(name, password)) | |
.map(new Func1<CouchbaseResponse, AsyncBucket>() { | |
@Override | |
public AsyncBucket call(CouchbaseResponse response) { | |
return new AsyncCouchbaseBucket(core, name, password, trans); | |
} | |
}); | |
} | |
@Override | |
public Observable<Boolean> disconnect() { | |
return core | |
.<DisconnectResponse>send(new DisconnectRequest()) | |
.flatMap(new Func1<DisconnectResponse, Observable<Boolean>>() { | |
@Override | |
public Observable<Boolean> call(DisconnectResponse disconnectResponse) { | |
return sharedEnvironment ? Observable.just(true) : environment.shutdown(); | |
} | |
}); | |
} | |
@Override | |
public Observable<AsyncClusterManager> clusterManager(final String username, final String password) { | |
return Observable.just((AsyncClusterManager) AsyncCouchbaseClusterManager.create(username, password, connectionString, | |
environment, core)); | |
} | |
@Override | |
public Observable<ClusterFacade> core() { | |
return Observable.just(core); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright (C) 2014 Couchbase, Inc. | |
* | |
* Permission is hereby granted, free of charge, to any person obtaining a copy | |
* of this software and associated documentation files (the "Software"), to deal | |
* in the Software without restriction, including without limitation the rights | |
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
* copies of the Software, and to permit persons to whom the Software is | |
* furnished to do so, subject to the following conditions: | |
* | |
* The above copyright notice and this permission notice shall be included in | |
* all copies or substantial portions of the Software. | |
* | |
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING | |
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALING | |
* IN THE SOFTWARE. | |
*/ | |
package com.couchbase.client.java; | |
import com.couchbase.client.core.ClusterFacade; | |
import com.couchbase.client.java.cluster.AsyncClusterManager; | |
import com.couchbase.client.java.cluster.ClusterManager; | |
import com.couchbase.client.java.document.Document; | |
import com.couchbase.client.java.transcoder.Transcoder; | |
import java.util.List; | |
import java.util.concurrent.TimeUnit; | |
/** | |
* The {@link Cluster} provides synchronous cluster level access and acts as the entry point | |
* for all Couchbase Server related operations. | |
* | |
* @author Michael Nitschinger | |
* @since 2.0 | |
*/ | |
public interface Cluster { | |
/** | |
* Returns the underlying {@link AsyncCluster} object. | |
* | |
* This method can be used to switch from a synchronous to an asynchronous interface. | |
* @return the {@link AsyncCluster}. | |
*/ | |
AsyncCluster async(); | |
/** | |
* Opens the *default* bucket with _no_ password and the default connect timeout. | |
* | |
* @return the opened {@link Bucket}. | |
*/ | |
Bucket openBucket(); | |
/** | |
* Opens the *default* bucket with _no_ password and a custom connect timeout. | |
* | |
* @return the opened {@link Bucket}. | |
*/ | |
Bucket openBucket(long timeout, TimeUnit timeUnit); | |
/** | |
* Opens the bucket with the given name, _no_ password and the default connect timeout. | |
* | |
* @return the opened {@link Bucket}. | |
*/ | |
Bucket openBucket(String name); | |
/** | |
* Opens the bucket with the given name, _no_ password and a custom connect timeout. | |
* | |
* @return the opened {@link Bucket}. | |
*/ | |
Bucket openBucket(String name, long timeout, TimeUnit timeUnit); | |
/** | |
* Opens the bucket with the given name, password and the default connect timeout. | |
* | |
* @return the opened {@link Bucket}. | |
*/ | |
Bucket openBucket(String name, String password); | |
/** | |
* Opens the bucket with the given name, password and a custom connect timeout. | |
* | |
* @return the opened {@link Bucket}. | |
*/ | |
Bucket openBucket(String name, String password, long timeout, TimeUnit timeUnit); | |
/** | |
* Opens the bucket with the given name, password, a custom list of {@link Transcoder}s and the default connect | |
* timeout.. | |
* | |
* Every custom {@link Transcoder} will either provide new functionality or override a default one provided | |
* for the specific {@link Document} type. | |
* | |
* @return the opened {@link Bucket}. | |
*/ | |
Bucket openBucket(String name, String password, List<Transcoder<? extends Document, ?>> transcoders); | |
/** | |
* Opens the bucket with the given name, password, a custom list of {@link Transcoder}s and a custom connect | |
* timeout.. | |
* | |
* Every custom {@link Transcoder} will either provide new functionality or override a default one provided | |
* for the specific {@link Document} type. | |
* | |
* @return the opened {@link Bucket}. | |
*/ | |
Bucket openBucket(String name, String password, List<Transcoder<? extends Document, ?>> transcoders, | |
long timeout, TimeUnit timeUnit); | |
/** | |
* Disconnects all open buckets and also shuts down all exclusively owned resources with the default disconnect | |
* timeout. | |
* | |
* @return true if the disconnect succeeded, false otherwise. | |
*/ | |
boolean disconnect(); | |
/** | |
* Disconnects all open buckets and also shuts down all exclusively owned resources with a custom disconnect | |
* timeout. | |
* | |
* @return true if the disconnect succeeded, false otherwise. | |
*/ | |
boolean disconnect(long timeout, TimeUnit timeUnit); | |
/** | |
* Opens the {@link AsyncClusterManager} for cluster-wide management operations. | |
* | |
* The username and password provided here are not the bucket name or password, but those which allow | |
* cluster-wide operational access (most commonly the system wide administrator or similar level credentials). | |
* | |
* @param username the authorized username. | |
* @param password the password of the username. | |
* @return the opened {@link AsyncClusterManager}. | |
*/ | |
ClusterManager clusterManager(String username, String password); | |
ClusterManager clusterManager(String username, String password, long timeout, TimeUnit timeUnit); | |
/** | |
* Provides direct access to the "core" engine and should be handled with care. | |
* | |
* @return the {@link ClusterFacade} to the core engine. | |
*/ | |
ClusterFacade core(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.couchbase.client.java; | |
import com.couchbase.client.core.ClusterFacade; | |
import com.couchbase.client.java.cluster.AsyncClusterManager; | |
import com.couchbase.client.java.cluster.ClusterManager; | |
import com.couchbase.client.java.cluster.CouchbaseClusterManager; | |
import com.couchbase.client.java.document.Document; | |
import com.couchbase.client.java.env.CouchbaseEnvironment; | |
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment; | |
import com.couchbase.client.java.transcoder.Transcoder; | |
import rx.functions.Action0; | |
import rx.functions.Action1; | |
import rx.functions.Func1; | |
import java.util.ArrayList; | |
import java.util.Arrays; | |
import java.util.Collections; | |
import java.util.List; | |
import java.util.concurrent.TimeUnit; | |
public class CouchbaseCluster implements Cluster { | |
private static final String DEFAULT_BUCKET = "default"; | |
private static final String DEFAULT_HOST = "127.0.0.1"; | |
private final AsyncCouchbaseCluster asyncCluster; | |
private final List<Bucket> openBuckets; | |
private final CouchbaseEnvironment environment; | |
public static CouchbaseCluster create() { | |
return create(DEFAULT_HOST); | |
} | |
public static CouchbaseCluster create(final CouchbaseEnvironment environment) { | |
return create(environment, DEFAULT_HOST); | |
} | |
public static CouchbaseCluster create(final String... nodes) { | |
return create(Arrays.asList(nodes)); | |
} | |
public static CouchbaseCluster create(final List<String> nodes) { | |
return new CouchbaseCluster(DefaultCouchbaseEnvironment.create(), ConnectionString.fromHostnames(nodes), false); | |
} | |
public static CouchbaseCluster create(final CouchbaseEnvironment environment, final String... nodes) { | |
return create(environment, Arrays.asList(nodes)); | |
} | |
public static CouchbaseCluster create(final CouchbaseEnvironment environment, final List<String> nodes) { | |
return new CouchbaseCluster(environment, ConnectionString.fromHostnames(nodes), true); | |
} | |
public static CouchbaseCluster fromConnectionString(final String connectionString) { | |
return new CouchbaseCluster(DefaultCouchbaseEnvironment.create(), ConnectionString.create(connectionString), false); | |
} | |
public static CouchbaseCluster fromConnectionString(final CouchbaseEnvironment environment, final String connectionString) { | |
return new CouchbaseCluster(environment, ConnectionString.create(connectionString), true); | |
} | |
CouchbaseCluster(final CouchbaseEnvironment environment, final ConnectionString connectionString, final boolean sharedEnvironment) { | |
this.asyncCluster = new AsyncCouchbaseCluster(environment, connectionString, sharedEnvironment); | |
this.openBuckets = Collections.synchronizedList(new ArrayList<Bucket>()); | |
this.environment = environment; | |
} | |
@Override | |
public AsyncCluster async() { | |
return asyncCluster; | |
} | |
@Override | |
public Bucket openBucket() { | |
return openBucket(DEFAULT_BUCKET); | |
} | |
@Override | |
public Bucket openBucket(final String name) { | |
return openBucket(name, null); | |
} | |
@Override | |
public Bucket openBucket(final String name, final String password) { | |
return openBucket(name, password, null); | |
} | |
@Override | |
public Bucket openBucket(final String name, final String password, | |
final List<Transcoder<? extends Document, ?>> transcoders) { | |
return openBucket(name, password, transcoders, environment.connectTimeout(), TimeUnit.MILLISECONDS); | |
} | |
@Override | |
public boolean disconnect() { | |
return disconnect(environment.disconnectTimeout(), TimeUnit.MILLISECONDS); | |
} | |
@Override | |
public ClusterManager clusterManager(final String username, final String password) { | |
return clusterManager(username, password, environment.connectTimeout(), TimeUnit.MILLISECONDS); | |
} | |
@Override | |
public ClusterFacade core() { | |
return asyncCluster.core().toBlocking().single(); | |
} | |
@Override | |
public Bucket openBucket(long timeout, TimeUnit timeUnit) { | |
return openBucket(DEFAULT_BUCKET, timeout, timeUnit); | |
} | |
@Override | |
public Bucket openBucket(String name, long timeout, TimeUnit timeUnit) { | |
return openBucket(name, null, timeout, timeUnit); | |
} | |
@Override | |
public Bucket openBucket(String name, String password, long timeout, TimeUnit timeUnit) { | |
return openBucket(name, password, null, timeout, timeUnit); | |
} | |
@Override | |
public Bucket openBucket(String name, String password, List<Transcoder<? extends Document, ?>> transcoders, long timeout, TimeUnit timeUnit) { | |
return asyncCluster | |
.openBucket(name, password, transcoders) | |
.map(new Func1<AsyncBucket, Bucket>() { | |
@Override | |
public Bucket call(AsyncBucket asyncBucket) { | |
return new CouchbaseBucket(asyncBucket); | |
} | |
}) | |
.doOnNext(new Action1<Bucket>() { | |
@Override | |
public void call(Bucket bucket) { | |
openBuckets.add(bucket); | |
} | |
}) | |
.timeout(timeout, timeUnit) | |
.toBlocking() | |
.single(); | |
} | |
@Override | |
public boolean disconnect(long timeout, TimeUnit timeUnit) { | |
return asyncCluster | |
.disconnect() | |
.doOnCompleted(new Action0() { | |
@Override | |
public void call() { | |
openBuckets.clear(); | |
} | |
}) | |
.timeout(timeout, timeUnit) | |
.toBlocking() | |
.single(); | |
} | |
@Override | |
public ClusterManager clusterManager(String username, String password, long timeout, TimeUnit timeUnit) { | |
return asyncCluster | |
.clusterManager(username, password) | |
.map(new Func1<AsyncClusterManager, ClusterManager>() { | |
@Override | |
public ClusterManager call(AsyncClusterManager asyncClusterManager) { | |
return new CouchbaseClusterManager(); | |
} | |
}) | |
.timeout(timeout, timeUnit) | |
.toBlocking() | |
.single(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment