Skip to content

Instantly share code, notes, and snippets.

Created April 2, 2015 13:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/c663676de78a6fe14797 to your computer and use it in GitHub Desktop.
Save anonymous/c663676de78a6fe14797 to your computer and use it in GitHub Desktop.
package tv.paprika.couchbase.test;
import com.couchbase.client.core.CouchbaseException;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.PersistTo;
import com.couchbase.client.java.ReplicaMode;
import com.couchbase.client.java.ReplicateTo;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonArray;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.error.CASMismatchException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
/**
* Demos what happens when the server starts dropping packets for a while.
*/
public class CouchbaseTest1
{
private static final Logger LOG = LoggerFactory.getLogger(CouchbaseTest1.class);
public static final String TEST_DOC_NAME = "walter";
public static final String TEST_DOC_ELEMENT = "myvalue";
final AtomicInteger nextValue = new AtomicInteger(1);
final Bucket bucket;
final String BUCKET = "default";
final String PWD = "";
final List<String> cbServers = Arrays.asList("n1", "n2", "n3", "n4", "n5");
final int numAttempts = 10;
int total;
protected final Set<Integer> acknowledgedWrites = Collections.synchronizedSet(new HashSet<Integer>(200));
public CouchbaseTest1() {
bucket = setCouchbaseBucket(cbServers, BUCKET, PWD);
}
private Bucket setCouchbaseBucket(final List<String> cbServers, final String bucketName, final String pwd)
{
CouchbaseCluster cluster = CouchbaseCluster.create(cbServers);
try {
return cluster.openBucket(bucketName, pwd);
} catch (CouchbaseException e) {
LOG.error("Error connecting to Couchbase: ", e);
throw new RuntimeException(e);
}
}
public static void main(String[] args) {
Properties systemProperties = System.getProperties();
systemProperties.put("net.spy.log.LoggerImpl", "net.spy.memcached.compat.log.SLF4JLogger");
System.setProperties(systemProperties);
new CouchbaseTest1().run();
}
public void run() {
setEmptyDocument();
appendToDocument(30000);
collectResults();
LOG.info("DONE!");
}
private void collectResults() {
JsonArray jsonArray = bucket.get(TEST_DOC_NAME).content().getArray(TEST_DOC_ELEMENT);
Set<Integer> saved = new HashSet<>();
for (int i = 0; i < jsonArray.size(); i++) {
saved.add(jsonArray.getInt(i));
}
Set<Integer> survivors = new HashSet<>(acknowledgedWrites);
survivors.retainAll(saved);
Set<Integer> lost = new HashSet<>(acknowledgedWrites);
lost.removeAll(survivors);
Set<Integer> unacked = new HashSet<>(saved);
unacked.removeAll(acknowledgedWrites);
LOG.info("total: {}", total);
LOG.info("acknowledged: {}", acknowledgedWrites.size());
LOG.info("survivors: {}", survivors.size());
LOG.info("acknowledged writes lost: {} ({})", lost.size(), lost.toString());
LOG.info("unacknowledged writes: {} ({})", unacked.size(), unacked.toString());
LOG.info("ack rate: {}", (double) acknowledgedWrites.size() / total);
LOG.info("loss rate: {}", (double) lost.size() / acknowledgedWrites.size());
}
/**
* Tries to append <code>nextVal</code> <code>numAttempts</code> times
*/
private boolean addElementToArray(Integer nextVal, int numAttempts) {
for (int i = 0; i < numAttempts; i++)
{
try
{
JsonDocument jsonDocument = null;
if (i % 3 == 0)
{
LOG.info("Reading master {}...", nextVal);
jsonDocument = bucket.get(TEST_DOC_NAME);
}
if (i % 3 == 1)
{
LOG.info("Reading replica 1 {}...", nextVal);
jsonDocument = bucket.getFromReplica(TEST_DOC_NAME, ReplicaMode.FIRST).get(0);
}
if (i % 3 == 2)
{
LOG.info("Reading replica 2 {}...", nextVal);
jsonDocument = bucket.getFromReplica(TEST_DOC_NAME, ReplicaMode.SECOND).get(0);
}
JsonArray modifiedJsonArray = appendElement(jsonDocument, TEST_DOC_ELEMENT, nextVal);
jsonDocument.content().put(TEST_DOC_ELEMENT, modifiedJsonArray);
bucket.replace(jsonDocument, PersistTo.TWO, ReplicateTo.TWO);
} catch (CASMismatchException e) {
LOG.info("CAS Mismatch with {} @{}. try", nextVal, i+1);
continue;
} catch (RuntimeException e) {
LOG.error("TimeoutException. Cause: {}, supressed: {}", e.getCause(), e.getSuppressed());
continue;
} catch (Exception e) {
LOG.error("Error: {}", e);
throw e;
}
acknowledgedWrites.add(nextVal);
LOG.info("OK, {} added.", nextVal);
return true;
}
LOG.error("ERROR, failed to append {} in {} attempts", nextVal, numAttempts);
return false;
}
private void appendToDocument(int elementsToAdd)
{
for (int i = 0; i < elementsToAdd; i++) {
addElementToArray(nextValue.getAndIncrement(), numAttempts);
}
total += elementsToAdd;
}
private JsonArray appendElement(JsonDocument doc, String element, Integer nextVal) {
JsonArray jsonArray = doc.content().getArray(element);
if (jsonArray == null) jsonArray = JsonArray.empty();
jsonArray.add(nextVal);
return jsonArray;
}
private void setEmptyDocument() {
JsonObject user = JsonObject.empty().putNull(TEST_DOC_ELEMENT);
try
{
bucket.upsert(JsonDocument.create(TEST_DOC_NAME, user));
} catch (Exception e) {
LOG.error("An error occured in setEmptyDocument(): ", e);
throw new RuntimeException(e);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment