Skip to content

Instantly share code, notes, and snippets.

@daschl
Last active November 23, 2019 16:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daschl/e32e05e6abc31e450f67c23fe30c3826 to your computer and use it in GitHub Desktop.
Save daschl/e32e05e6abc31e450f67c23fe30c3826 to your computer and use it in GitHub Desktop.
package org.testcontainers.couchbase;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
import org.junit.ClassRule;
import org.junit.Test;
public class BaseCouchbaseContainerTest {
@ClassRule
public static CouchbaseContainer container = new CouchbaseContainer();
@Test
public void shouldInsertAndGet() {
CouchbaseEnvironment env = DefaultCouchbaseEnvironment.builder()
.bootstrapCarrierDirectPort(container.getKvPort())
.bootstrapHttpDirectPort(container.getHttpPort())
.build();
Cluster cluster = CouchbaseCluster.create(env);
cluster.authenticate(container.getUsername(), container.getPassword());
Bucket bucket = cluster.openBucket("foobar");
bucket.insert(JsonDocument.create("foo", JsonObject.empty()));
System.err.println(bucket.get("foo"));
cluster.disconnect();
env.shutdown();
}
}
package org.testcontainers.couchbase;
import java.util.EnumSet;
import java.util.Set;
public class ClusterSpec {
private final Set<Service> enabledServices;
private final int memoryQuota;
public Set<Service> getEnabledServices() {
return enabledServices;
}
public int getMemoryQuota() {
return memoryQuota;
}
private ClusterSpec(Builder builder) {
this.enabledServices = builder.enabledServices;
this.memoryQuota = builder.memoryQuota;
}
public static ClusterSpec.Builder builder() {
return new Builder();
}
public static ClusterSpec fromDefaults() {
return builder().build();
}
public static class Builder {
private Set<Service> enabledServices = EnumSet.of(Service.KV);
private int memoryQuota = 300;
public Builder enabledServices(Set<Service> enabled) {
this.enabledServices = enabled;
return this;
}
public Builder memoryQuota(int memoryQuota) {
this.memoryQuota = memoryQuota;
return this;
}
public ClusterSpec build() {
return new ClusterSpec(this);
}
}
enum Service {
KV("kv");
private String identifier;
Service(String identifier) {
this.identifier = identifier;
}
public String identifier() {
return identifier;
}
}
}
/*
* Copyright (c) 2019 Couchbase, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.testcontainers.couchbase;
import org.testcontainers.containers.ContainerLaunchException;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.shaded.com.fasterxml.jackson.databind.JsonNode;
import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.testcontainers.shaded.org.apache.commons.io.IOUtils;
import org.testcontainers.utility.ThrowingFunction;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Base64;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.net.HttpURLConnection.HTTP_OK;
public class CouchbaseContainer<SELF extends CouchbaseContainer<SELF>> extends GenericContainer<SELF> {
public static final ObjectMapper MAPPER = new ObjectMapper();
private static final AtomicInteger INSTANCE_COUNT = new AtomicInteger(0);
private static final String STATIC_CONFIG = "/opt/couchbase/etc/couchbase/static_config";
private static final String CAPI_CONFIG = "/opt/couchbase/etc/couchdb/default.d/capi.ini";
private static final String CACHED_CONFIG = "/opt/couchbase/var/lib/couchbase/config/config.dat";
private static final String IMAGE = "couchbase/server";
private static final String VERSION = "enterprise-6.0.3";
private int containerPortOffset;
private ClusterSpec clusterSpec = ClusterSpec.fromDefaults();
public CouchbaseContainer() {
this(IMAGE + ":" + VERSION);
}
public CouchbaseContainer(final String imageName) {
super(imageName);
containerPortOffset = INSTANCE_COUNT.getAndIncrement();
withNetwork(Network.SHARED);
Arrays.stream(Ports.values()).map(p -> p.getOriginalPort(containerPortOffset)).forEach(this::addExposedPort);
setWaitStrategy(new HttpWaitStrategy()
.forPort(Ports.REST.getOriginalPort(containerPortOffset))
.forPath("/pools")
.forStatusCode(200)
);
}
public int getKvPort() {
return getMappedPort(Ports.MEMCACHED.getOriginalPort(containerPortOffset));
}
public int getHttpPort() {
return getMappedPort(Ports.REST.getOriginalPort(containerPortOffset));
}
@Override
protected void doStart() {
super.doStart();
try {
initializeCluster();
createBuckets();
} catch (Exception ex) {
throw new ContainerLaunchException("Could not launch couchbase container", ex);
}
}
String urlBase;
String clusterUsername = "Administrator";
String clusterPassword = "password";
public String getUsername() {
return clusterUsername;
}
public String getPassword() {
return clusterPassword;
}
private void initializeCluster() throws Exception {
urlBase = String.format("http://%s:%s", getContainerIpAddress(), getMappedPort(Ports.REST.getOriginalPort(containerPortOffset)));
String poolURL = "/pools/default";
String poolPayload = "memoryQuota="
+ URLEncoder.encode(Integer.toString(clusterSpec.getMemoryQuota()), "UTF-8");
String setupServicesURL = "/node/controller/setupServices";
StringBuilder servicePayloadBuilder = new StringBuilder();
for (ClusterSpec.Service service : clusterSpec.getEnabledServices()) {
servicePayloadBuilder.append(service.identifier()).append(",");
}
String setupServiceContent = "services=" + URLEncoder.encode(servicePayloadBuilder.toString(), "UTF-8");
String webSettingsURL = "/settings/web";
String webSettingsContent = "username=" + URLEncoder.encode(clusterUsername, "UTF-8") + "&password=" + URLEncoder.encode(clusterPassword, "UTF-8") + "&port=8091";
callCouchbaseRestAPI(poolURL, poolPayload);
callCouchbaseRestAPI(setupServicesURL, setupServiceContent);
callCouchbaseRestAPI(webSettingsURL, webSettingsContent);
createNodeWaitStrategy().waitUntilReady(this);
}
private void createBuckets() {
}
private HttpWaitStrategy createNodeWaitStrategy() {
return new HttpWaitStrategy()
.forPath("/pools/default/")
.withBasicCredentials(clusterUsername, clusterPassword)
.forPort(Ports.REST.getOriginalPort(containerPortOffset))
.forStatusCode(HTTP_OK)
.forResponsePredicate(response -> {
try {
return Optional.of(MAPPER.readTree(response))
.map(n -> n.at("/nodes/0/status"))
.map(JsonNode::asText)
.map("healthy"::equals)
.orElse(false);
} catch (IOException e) {
//logger().error("Unable to parse response {}", response);
return false;
}
});
}
public void callCouchbaseRestAPI(String url, String payload) throws IOException {
String fullUrl = urlBase + url;
HttpURLConnection httpConnection = (HttpURLConnection) ((new URL(fullUrl).openConnection()));
httpConnection.setDoOutput(true);
httpConnection.setRequestMethod("POST");
httpConnection.setRequestProperty("Content-Type",
"application/x-www-form-urlencoded");
String encoded = Base64.getEncoder().encodeToString((clusterUsername + ":" + clusterPassword).getBytes(StandardCharsets.UTF_8));
httpConnection.setRequestProperty("Authorization", "Basic " + encoded);
DataOutputStream out = new DataOutputStream(httpConnection.getOutputStream());
out.writeBytes(payload);
out.flush();
httpConnection.getResponseCode();
}
@Override
protected void containerIsCreated(String containerId) {
patchConfig(STATIC_CONFIG, this::addMappedPorts);
// capi needs a special configuration, see https://developer.couchbase.com/documentation/server/current/install/install-ports.html
patchConfig(CAPI_CONFIG, this::replaceCapiPort);
copyFileToContainer(Transferable.of(new byte[]{}), CACHED_CONFIG);
}
private void patchConfig(String configLocation, ThrowingFunction<String, String> patchFunction) {
String patchedConfig = copyFileFromContainer(configLocation,
inputStream -> patchFunction.apply(IOUtils.toString(inputStream, StandardCharsets.UTF_8)));
copyFileToContainer(Transferable.of(patchedConfig.getBytes(StandardCharsets.UTF_8)), configLocation);
}
private String addMappedPorts(String originalConfig) {
String portConfig = Stream.of(Ports.values())
.map(port -> String.format("{%s, %d}.", port.name, port.getOriginalPort(containerPortOffset)))
.collect(Collectors.joining("\n"));
return String.format("%s\n%s", originalConfig, portConfig);
}
private String replaceCapiPort(String originalConfig) {
return Arrays.stream(originalConfig.split("\n"))
.map(s -> (s.matches("port\\s*=\\s*" + Ports.CAPI.getOriginalPort())) ? "port = " + Ports.CAPI.getOriginalPort(containerPortOffset) : s)
.collect(Collectors.joining("\n"));
}
private enum Ports {
REST("rest_port", 8091),
CAPI("capi_port", 8092),
QUERY("query_port", 8093),
FTS("fts_http_port", 8094),
CBAS("cbas_http_port", 8095),
EVENTING("eventing_http_port", 8096),
MEMCACHED_SSL("memcached_ssl_port", 11207),
MEMCACHED("memcached_port", 11210),
REST_SSL("ssl_rest_port", 18091),
CAPI_SSL("ssl_capi_port", 18092),
QUERY_SSL("ssl_query_port", 18093),
FTS_SSL("fts_ssl_port", 18094),
CBAS_SSL("cbas_ssl_port", 18095),
EVENTING_SSL("eventing_ssl_port", 18096);
private final String name;
private final int originalPort;
Ports(String name, int originalPort) {
this.name = name;
this.originalPort = originalPort;
}
public int getOriginalPort(int offset) {
return originalPort + offset;
}
public int getOriginalPort() {
return originalPort;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment