Created
April 2, 2015 13:07
-
-
Save anonymous/c663676de78a6fe14797 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package tv.paprika.couchbase.test; | |
import com.couchbase.client.core.CouchbaseException; | |
import com.couchbase.client.java.Bucket; | |
import com.couchbase.client.java.CouchbaseCluster; | |
import com.couchbase.client.java.PersistTo; | |
import com.couchbase.client.java.ReplicaMode; | |
import com.couchbase.client.java.ReplicateTo; | |
import com.couchbase.client.java.document.JsonDocument; | |
import com.couchbase.client.java.document.json.JsonArray; | |
import com.couchbase.client.java.document.json.JsonObject; | |
import com.couchbase.client.java.error.CASMismatchException; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.util.Arrays; | |
import java.util.Collections; | |
import java.util.HashSet; | |
import java.util.List; | |
import java.util.Properties; | |
import java.util.Set; | |
import java.util.concurrent.atomic.AtomicInteger; | |
/** | |
* Demos what happens when the server starts dropping packets for a while. | |
*/ | |
public class CouchbaseTest1 | |
{ | |
private static final Logger LOG = LoggerFactory.getLogger(CouchbaseTest1.class); | |
public static final String TEST_DOC_NAME = "walter"; | |
public static final String TEST_DOC_ELEMENT = "myvalue"; | |
final AtomicInteger nextValue = new AtomicInteger(1); | |
final Bucket bucket; | |
final String BUCKET = "default"; | |
final String PWD = ""; | |
final List<String> cbServers = Arrays.asList("n1", "n2", "n3", "n4", "n5"); | |
final int numAttempts = 10; | |
int total; | |
protected final Set<Integer> acknowledgedWrites = Collections.synchronizedSet(new HashSet<Integer>(200)); | |
public CouchbaseTest1() { | |
bucket = setCouchbaseBucket(cbServers, BUCKET, PWD); | |
} | |
private Bucket setCouchbaseBucket(final List<String> cbServers, final String bucketName, final String pwd) | |
{ | |
CouchbaseCluster cluster = CouchbaseCluster.create(cbServers); | |
try { | |
return cluster.openBucket(bucketName, pwd); | |
} catch (CouchbaseException e) { | |
LOG.error("Error connecting to Couchbase: ", e); | |
throw new RuntimeException(e); | |
} | |
} | |
public static void main(String[] args) { | |
Properties systemProperties = System.getProperties(); | |
systemProperties.put("net.spy.log.LoggerImpl", "net.spy.memcached.compat.log.SLF4JLogger"); | |
System.setProperties(systemProperties); | |
new CouchbaseTest1().run(); | |
} | |
public void run() { | |
setEmptyDocument(); | |
appendToDocument(30000); | |
collectResults(); | |
LOG.info("DONE!"); | |
} | |
private void collectResults() { | |
JsonArray jsonArray = bucket.get(TEST_DOC_NAME).content().getArray(TEST_DOC_ELEMENT); | |
Set<Integer> saved = new HashSet<>(); | |
for (int i = 0; i < jsonArray.size(); i++) { | |
saved.add(jsonArray.getInt(i)); | |
} | |
Set<Integer> survivors = new HashSet<>(acknowledgedWrites); | |
survivors.retainAll(saved); | |
Set<Integer> lost = new HashSet<>(acknowledgedWrites); | |
lost.removeAll(survivors); | |
Set<Integer> unacked = new HashSet<>(saved); | |
unacked.removeAll(acknowledgedWrites); | |
LOG.info("total: {}", total); | |
LOG.info("acknowledged: {}", acknowledgedWrites.size()); | |
LOG.info("survivors: {}", survivors.size()); | |
LOG.info("acknowledged writes lost: {} ({})", lost.size(), lost.toString()); | |
LOG.info("unacknowledged writes: {} ({})", unacked.size(), unacked.toString()); | |
LOG.info("ack rate: {}", (double) acknowledgedWrites.size() / total); | |
LOG.info("loss rate: {}", (double) lost.size() / acknowledgedWrites.size()); | |
} | |
/** | |
* Tries to append <code>nextVal</code> <code>numAttempts</code> times | |
*/ | |
private boolean addElementToArray(Integer nextVal, int numAttempts) { | |
for (int i = 0; i < numAttempts; i++) | |
{ | |
try | |
{ | |
JsonDocument jsonDocument = null; | |
if (i % 3 == 0) | |
{ | |
LOG.info("Reading master {}...", nextVal); | |
jsonDocument = bucket.get(TEST_DOC_NAME); | |
} | |
if (i % 3 == 1) | |
{ | |
LOG.info("Reading replica 1 {}...", nextVal); | |
jsonDocument = bucket.getFromReplica(TEST_DOC_NAME, ReplicaMode.FIRST).get(0); | |
} | |
if (i % 3 == 2) | |
{ | |
LOG.info("Reading replica 2 {}...", nextVal); | |
jsonDocument = bucket.getFromReplica(TEST_DOC_NAME, ReplicaMode.SECOND).get(0); | |
} | |
JsonArray modifiedJsonArray = appendElement(jsonDocument, TEST_DOC_ELEMENT, nextVal); | |
jsonDocument.content().put(TEST_DOC_ELEMENT, modifiedJsonArray); | |
bucket.replace(jsonDocument, PersistTo.TWO, ReplicateTo.TWO); | |
} catch (CASMismatchException e) { | |
LOG.info("CAS Mismatch with {} @{}. try", nextVal, i+1); | |
continue; | |
} catch (RuntimeException e) { | |
LOG.error("TimeoutException. Cause: {}, supressed: {}", e.getCause(), e.getSuppressed()); | |
continue; | |
} catch (Exception e) { | |
LOG.error("Error: {}", e); | |
throw e; | |
} | |
acknowledgedWrites.add(nextVal); | |
LOG.info("OK, {} added.", nextVal); | |
return true; | |
} | |
LOG.error("ERROR, failed to append {} in {} attempts", nextVal, numAttempts); | |
return false; | |
} | |
private void appendToDocument(int elementsToAdd) | |
{ | |
for (int i = 0; i < elementsToAdd; i++) { | |
addElementToArray(nextValue.getAndIncrement(), numAttempts); | |
} | |
total += elementsToAdd; | |
} | |
private JsonArray appendElement(JsonDocument doc, String element, Integer nextVal) { | |
JsonArray jsonArray = doc.content().getArray(element); | |
if (jsonArray == null) jsonArray = JsonArray.empty(); | |
jsonArray.add(nextVal); | |
return jsonArray; | |
} | |
private void setEmptyDocument() { | |
JsonObject user = JsonObject.empty().putNull(TEST_DOC_ELEMENT); | |
try | |
{ | |
bucket.upsert(JsonDocument.create(TEST_DOC_NAME, user)); | |
} catch (Exception e) { | |
LOG.error("An error occured in setEmptyDocument(): ", e); | |
throw new RuntimeException(e); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment