Skip to content

Instantly share code, notes, and snippets.

@daschl
Last active March 25, 2020 14:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save daschl/5715418e6801c5684144079b74bda879 to your computer and use it in GitHub Desktop.
Save daschl/5715418e6801c5684144079b74bda879 to your computer and use it in GitHub Desktop.
package org.testcontainers.couchbase;
import com.github.dockerjava.api.command.InspectContainerResponse;
import com.github.dockerjava.api.model.ContainerNetwork;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
import org.testcontainers.shaded.com.fasterxml.jackson.databind.JsonNode;
import org.testcontainers.shaded.com.fasterxml.jackson.databind.ObjectMapper;
import org.testcontainers.shaded.okhttp3.Credentials;
import org.testcontainers.shaded.okhttp3.FormBody;
import org.testcontainers.shaded.okhttp3.OkHttpClient;
import org.testcontainers.shaded.okhttp3.Request;
import org.testcontainers.shaded.okhttp3.RequestBody;
import org.testcontainers.shaded.okhttp3.Response;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
public class CouchbaseContainer extends GenericContainer<CouchbaseContainer> {
private static final ObjectMapper MAPPER = new ObjectMapper();
private static final OkHttpClient HTTP_CLIENT = new OkHttpClient()
.newBuilder()
.readTimeout(Duration.ofMinutes(1))
.build();
private static final int MGMT_PORT = 8091;
private static final int MGMT_SSL_PORT = 18091;
private static final int VIEW_PORT = 8092;
private static final int VIEW_SSL_PORT = 18092;
private static final int QUERY_PORT = 8093;
private static final int QUERY_SSL_PORT = 18093;
private static final int SEARCH_PORT = 8094;
private static final int SEARCH_SSL_PORT = 18094;
private static final int KV_PORT = 11210;
private static final int KV_SSL_PORT = 11207;
public static final String VERSION = "6.5.0";
public static final String DOCKER_IMAGE_NAME = "couchbase/server:";
private String username = "Administrator";
private String password = "password";
private Set<Service> enabledServices = EnumSet.allOf(Service.class);
private List<BucketDefinition> buckets = new ArrayList<>();
public CouchbaseContainer() {
this(DOCKER_IMAGE_NAME + VERSION);
}
public CouchbaseContainer(final String imageName) {
super(imageName);
}
public String username() {
return username;
}
public CouchbaseContainer withUsername(final String username) {
checkNotRunning();
this.username = username;
return this;
}
public String password() {
return password;
}
public CouchbaseContainer withPassword(final String password) {
checkNotRunning();
this.password = password;
return this;
}
public CouchbaseContainer withBucket(final BucketDefinition bucketDefinition) {
checkNotRunning();
this.buckets.add(bucketDefinition);
return this;
}
public CouchbaseContainer enabledServices(final Service... enabled) {
checkNotRunning();
this.enabledServices = EnumSet.copyOf(Arrays.asList(enabled));
return this;
}
private void checkNotRunning() {
if (isRunning()) {
throw new IllegalStateException("Setter can only be called before the container is running");
}
}
@Override
protected void containerIsStarted(final InspectContainerResponse containerInfo) {
waitUntilNodeIsOnline();
renameNode();
initializeServices();
addAdminUser();
configureExternalPorts();
if (enabledServices.contains(Service.INDEX)) {
configureIndexer();
}
waitUntilNodeIsReady();
createBuckets();
}
private void configureIndexer() {
Response response = doHttpRequest(MGMT_PORT, "/settings/indexes", "POST", new FormBody.Builder()
.add("storageMode", "memory_optimized")
.build(), true
);
System.err.println(response);
}
private void waitUntilNodeIsOnline() {
new HttpWaitStrategy().forPort(MGMT_PORT).forPath("/pools").forStatusCode(200).waitUntilReady(this);
}
private void renameNode() {
Response response = doHttpRequest(MGMT_PORT, "/node/controller/rename", "POST", new FormBody.Builder()
.add("hostname", getInternalIpAddress())
.build(), false
);
System.err.println(response);
}
private String getInternalIpAddress() {
final Map<String, ContainerNetwork> networks = getContainerInfo().getNetworkSettings().getNetworks();
for (ContainerNetwork network : networks.values()) {
return network.getIpAddress();
}
throw new IllegalStateException("No network available to extract the internal IP from!");
}
private void initializeServices() {
final String services = enabledServices.stream().map(s -> {
switch (s) {
case KV: return "kv";
case QUERY: return "n1ql";
case INDEX: return "index";
case SEARCH: return "fts";
default: throw new IllegalStateException("Unknown service!");
}
}).collect(Collectors.joining(","));
Response response = doHttpRequest(MGMT_PORT, "/node/controller/setupServices", "POST", new FormBody.Builder()
.add("services", services)
.build(), false
);
System.err.println(response);
}
private void addAdminUser() {
Response response = doHttpRequest(MGMT_PORT, "/settings/web", "POST", new FormBody.Builder()
.add("username", username())
.add("password", password())
.add("port", Integer.toString(MGMT_PORT))
.build(), false);
System.err.println(response);
}
private void configureExternalPorts() {
final FormBody.Builder builder = new FormBody.Builder();
builder.add("hostname", getContainerIpAddress());
builder.add("mgmt", Integer.toString(getMappedPort(MGMT_PORT)));
builder.add("mgmtSSL", Integer.toString(getMappedPort(MGMT_SSL_PORT)));
if (enabledServices.contains(Service.KV)) {
builder.add("kv", Integer.toString(getMappedPort(KV_PORT)));
builder.add("kvSSL", Integer.toString(getMappedPort(KV_SSL_PORT)));
builder.add("capi", Integer.toString(getMappedPort(VIEW_PORT)));
builder.add("capiSSL", Integer.toString(getMappedPort(VIEW_SSL_PORT)));
}
if (enabledServices.contains(Service.QUERY)) {
builder.add("n1ql", Integer.toString(getMappedPort(QUERY_PORT)));
builder.add("n1qlSSL", Integer.toString(getMappedPort(QUERY_SSL_PORT)));
}
if (enabledServices.contains(Service.SEARCH)) {
builder.add("fts", Integer.toString(getMappedPort(SEARCH_PORT)));
builder.add("ftsSSL", Integer.toString(getMappedPort(SEARCH_SSL_PORT)));
}
final Response response = doHttpRequest(
MGMT_PORT,
"/node/controller/setupAlternateAddresses/external",
"PUT",
builder.build(),
true
);
System.err.println(response);
}
private void waitUntilNodeIsReady() {
new HttpWaitStrategy()
.forPath("/pools/default")
.forPort(MGMT_PORT)
.withBasicCredentials(username(), password())
.forStatusCode(200)
.forResponsePredicate(response -> {
try {
return Optional.of(MAPPER.readTree(response))
.map(n -> n.at("/nodes/0/status"))
.map(JsonNode::asText)
.map("healthy"::equals)
.orElse(false);
} catch (IOException e) {
logger().error("Unable to parse response {}", response);
return false;
}
}).waitUntilReady(this);
}
private void createBuckets() {
for (BucketDefinition bucket : buckets) {
Response response = doHttpRequest(MGMT_PORT, "/pools/default/buckets", "POST", new FormBody.Builder()
.add("name", bucket.name)
.add("ramQuotaMB", Integer.toString(bucket.quota))
.build(), true);
System.err.println(response);
if (bucket.queryPrimaryIndex) {
Response queryResponse = doHttpRequest(QUERY_PORT, "/query/service", "POST", new FormBody.Builder()
.add("statement", "CREATE PRIMARY INDEX on `" + bucket.name + "`")
.build(), true);
System.err.println(queryResponse);
}
}
}
private Response doHttpRequest(int port, final String path, final String method, final RequestBody body,
final boolean auth) {
try {
Request.Builder requestBuilder = new Request.Builder()
.url("http://" + getContainerIpAddress() + ":" + getMappedPort(port) + path);
if (auth) {
requestBuilder = requestBuilder.header("Authorization", Credentials.basic(username(), password()));
}
if (body == null) {
requestBuilder = requestBuilder.get();
} else {
requestBuilder = requestBuilder.method(method, body);
}
return HTTP_CLIENT.newCall(requestBuilder.build()).execute();
} catch (Exception ex) {
throw new RuntimeException("Could not perform request", ex);
}
}
public enum Service {
KV,
QUERY,
SEARCH,
INDEX,
}
public static class BucketDefinition {
private final String name;
private boolean queryPrimaryIndex = true;
private int quota = 100;
public BucketDefinition(String name) {
this.name = name;
}
public BucketDefinition withQuota(final int quota) {
if (quota < 100) {
throw new IllegalArgumentException("Bucket quota cannot be less than 100MB!");
}
this.quota = quota;
return this;
}
public BucketDefinition createQueryPrimaryIndex(final boolean create) {
this.queryPrimaryIndex = create;
return this;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment