Created
February 24, 2016 18:58
-
-
Save jziub/e845e355a68f7bae9f7c to your computer and use it in GitHub Desktop.
couchbase v1.4.5
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.yahoo.ycsb.db; | |
import java.io.StringWriter; | |
import java.io.Writer; | |
import java.net.URI; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.Properties; | |
import java.util.Set; | |
import java.util.Vector; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.couchbase.client.CouchbaseClient; | |
import com.couchbase.client.CouchbaseConnectionFactory; | |
import com.couchbase.client.CouchbaseConnectionFactoryBuilder; | |
import com.yahoo.ycsb.ByteIterator; | |
import com.yahoo.ycsb.DB; | |
import com.yahoo.ycsb.DBException; | |
import com.yahoo.ycsb.Status; | |
import com.yahoo.ycsb.StringByteIterator; | |
import com.fasterxml.jackson.core.JsonFactory; | |
import com.fasterxml.jackson.core.JsonGenerator; | |
import com.fasterxml.jackson.databind.JsonNode; | |
import com.fasterxml.jackson.databind.ObjectMapper; | |
import com.fasterxml.jackson.databind.node.ObjectNode; | |
import net.spy.memcached.FailureMode; | |
import net.spy.memcached.PersistTo; | |
import net.spy.memcached.ReplicateTo; | |
import net.spy.memcached.internal.GetFuture; | |
import net.spy.memcached.internal.OperationFuture; | |
public class CouchbaseClient1 extends DB { | |
public static final String URL_PROPERTY = "couchbase.url"; | |
public static final String BUCKET_PROPERTY = "couchbase.bucket"; | |
public static final String PASSWORD_PROPERTY = "couchbase.password"; | |
public static final String CHECKF_PROPERTY = "couchbase.checkFutures"; | |
public static final String PERSIST_PROPERTY = "couchbase.persistTo"; | |
public static final String REPLICATE_PROPERTY = "couchbase.replicateTo"; | |
public static final String JSON_PROPERTY = "couchbase.json"; | |
protected static final ObjectMapper JSON_MAPPER = new ObjectMapper(); | |
private CouchbaseClient client; | |
private PersistTo persistTo; | |
private ReplicateTo replicateTo; | |
private boolean useJson; | |
private final Logger log = LoggerFactory.getLogger(getClass()); | |
@Override | |
public void init() throws DBException { | |
Properties props = getProperties(); | |
String host = props.getProperty(URL_PROPERTY, "127.0.0.1"); | |
String bucket = props.getProperty(BUCKET_PROPERTY, "default"); | |
String password = props.getProperty(PASSWORD_PROPERTY, ""); | |
useJson = props.getProperty(JSON_PROPERTY, "true").equals("true"); | |
persistTo = parsePersistTo(props.getProperty(PERSIST_PROPERTY, "0")); | |
replicateTo = parseReplicateTo(props.getProperty(REPLICATE_PROPERTY, "0")); | |
Properties systemProperties = System.getProperties(); | |
systemProperties.put("net.spy.log.LoggerImpl", "net.spy.memcached.compat.log.SLF4JLogger"); | |
System.setProperties(systemProperties); | |
try { | |
CouchbaseConnectionFactoryBuilder builder = new CouchbaseConnectionFactoryBuilder(); | |
builder.setReadBufferSize(16384); | |
builder.setOpTimeout(5000); | |
builder.setFailureMode(FailureMode.Redistribute); | |
List<URI> servers = new ArrayList<URI>(); | |
servers.add(new URI("http://" + host + ":8091/pools")); | |
CouchbaseConnectionFactory connectionFactory = builder.buildCouchbaseConnection( | |
servers, bucket, password); | |
client = new CouchbaseClient(connectionFactory); | |
} catch (Exception e) { | |
throw new DBException("Could not create CouchbaseClient object.", e); | |
} | |
} | |
/** | |
* Parse the replicate property into the correct enum. | |
* | |
* @param property | |
* the stringified property value. | |
* @throws DBException | |
* if parsing the property did fail. | |
* @return the correct enum. | |
*/ | |
private ReplicateTo parseReplicateTo(final String property) throws DBException { | |
int value = Integer.parseInt(property); | |
switch (value) { | |
case 0: | |
return ReplicateTo.ZERO; | |
case 1: | |
return ReplicateTo.ONE; | |
case 2: | |
return ReplicateTo.TWO; | |
case 3: | |
return ReplicateTo.THREE; | |
default: | |
throw new DBException(REPLICATE_PROPERTY + " must be between 0 and 3"); | |
} | |
} | |
/** | |
* Parse the persist property into the correct enum. | |
* | |
* @param property | |
* the stringified property value. | |
* @throws DBException | |
* if parsing the property did fail. | |
* @return the correct enum. | |
*/ | |
private PersistTo parsePersistTo(final String property) throws DBException { | |
int value = Integer.parseInt(property); | |
switch (value) { | |
case 0: | |
return PersistTo.ZERO; | |
case 1: | |
return PersistTo.ONE; | |
case 2: | |
return PersistTo.TWO; | |
case 3: | |
return PersistTo.THREE; | |
case 4: | |
return PersistTo.FOUR; | |
default: | |
throw new DBException(PERSIST_PROPERTY + " must be between 0 and 4"); | |
} | |
} | |
/** | |
* Prefix the key with the given prefix, to establish a unique namespace. | |
* | |
* @param prefix | |
* the prefix to use. | |
* @param key | |
* the actual key. | |
* @return the formatted and prefixed key. | |
*/ | |
private String formatKey(final String prefix, final String key) { | |
return prefix + ":" + key; | |
} | |
/** | |
* Shutdown the client. | |
*/ | |
@Override | |
public void cleanup() { | |
if (client != null) { | |
client.shutdown(); | |
} | |
} | |
@Override | |
public Status read(final String table, final String key, final Set<String> fields, | |
final HashMap<String, ByteIterator> result) { | |
String formattedKey = formatKey(table, key); | |
try { | |
GetFuture<Object> future = client.asyncGet(formattedKey); | |
if (future.getStatus().isSuccess() && future.get() != null) { | |
decode(future.get(), fields, result); | |
return Status.OK; | |
} else { | |
return Status.ERROR; | |
} | |
} catch (Exception e) { | |
if (log.isErrorEnabled()) { | |
log.error("Could not read value for key " + formattedKey, e); | |
} | |
return Status.ERROR; | |
} | |
} | |
/** | |
* Scan is currently not implemented. | |
* | |
* @param table | |
* The name of the table | |
* @param startkey | |
* The record key of the first record to read. | |
* @param recordcount | |
* The number of records to read | |
* @param fields | |
* The list of fields to read, or null for all of them | |
* @param result | |
* A Vector of HashMaps, where each HashMap is a set field/value | |
* pairs for one record | |
* @return Status.ERROR, because not implemented yet. | |
*/ | |
@Override | |
public Status scan(final String table, final String startkey, final int recordcount, final Set<String> fields, | |
final Vector<HashMap<String, ByteIterator>> result) { | |
return Status.ERROR; | |
} | |
@Override | |
public Status update(final String table, final String key, final HashMap<String, ByteIterator> values) { | |
String formattedKey = formatKey(table, key); | |
try { | |
OperationFuture<Boolean> future = client.replace(formattedKey, encode(values), persistTo, replicateTo); | |
return future.getStatus().isSuccess() ? Status.OK: Status.ERROR; | |
} catch (Exception e) { | |
if (log.isErrorEnabled()) { | |
log.error("Could not insert value for key " + formattedKey, e); | |
} | |
return Status.ERROR; | |
} | |
} | |
@Override | |
public Status insert(final String table, final String key, final HashMap<String, ByteIterator> values) { | |
String formattedKey = formatKey(table, key); | |
try { | |
OperationFuture<Boolean> future = client.add(formattedKey, encode(values), persistTo, replicateTo); | |
return future.getStatus().isSuccess() ? Status.OK: Status.ERROR; | |
} catch (Exception e) { | |
if (log.isErrorEnabled()) { | |
log.error("Could not insert value for key " + formattedKey, e); | |
} | |
return Status.ERROR; | |
} | |
} | |
@Override | |
public Status delete(final String table, final String key) { | |
String formattedKey = formatKey(table, key); | |
try { | |
OperationFuture<Boolean> future = client.delete(formattedKey, persistTo, replicateTo); | |
return future.getStatus().isSuccess() ? Status.OK: Status.ERROR; | |
} catch (Exception e) { | |
if (log.isErrorEnabled()) { | |
log.error("Could not delete value for key " + formattedKey, e); | |
} | |
return Status.ERROR; | |
} | |
} | |
/** | |
* Decode the object from server into the storable result. | |
* | |
* @param source | |
* the loaded object. | |
* @param fields | |
* the fields to check. | |
* @param dest | |
* the result passed back to the ycsb core. | |
*/ | |
private void decode(final Object source, final Set<String> fields, final HashMap<String, ByteIterator> dest) { | |
if (useJson) { | |
try { | |
JsonNode json = JSON_MAPPER.readTree((String) source); | |
boolean checkFields = fields != null && !fields.isEmpty(); | |
for (Iterator<Map.Entry<String, JsonNode>> jsonFields = json.fields(); jsonFields.hasNext();) { | |
Map.Entry<String, JsonNode> jsonField = jsonFields.next(); | |
String name = jsonField.getKey(); | |
if (checkFields && fields.contains(name)) { | |
continue; | |
} | |
JsonNode jsonValue = jsonField.getValue(); | |
if (jsonValue != null && !jsonValue.isNull()) { | |
dest.put(name, new StringByteIterator(jsonValue.asText())); | |
} | |
} | |
} catch (Exception e) { | |
throw new RuntimeException("Could not decode JSON"); | |
} | |
} else { | |
HashMap<String, String> converted = (HashMap<String, String>) source; | |
for (Map.Entry<String, String> entry : converted.entrySet()) { | |
dest.put(entry.getKey(), new StringByteIterator(entry.getValue())); | |
} | |
} | |
} | |
/** | |
* Encode the object for couchbase storage. | |
* | |
* @param source | |
* the source value. | |
* @return the storable object. | |
*/ | |
private Object encode(final HashMap<String, ByteIterator> source) { | |
HashMap<String, String> stringMap = StringByteIterator.getStringMap(source); | |
if (!useJson) { | |
return stringMap; | |
} | |
ObjectNode node = JSON_MAPPER.createObjectNode(); | |
for (Map.Entry<String, String> pair : stringMap.entrySet()) { | |
node.put(pair.getKey(), pair.getValue()); | |
} | |
JsonFactory jsonFactory = new JsonFactory(); | |
Writer writer = new StringWriter(); | |
try { | |
JsonGenerator jsonGenerator = jsonFactory.createGenerator(writer); | |
JSON_MAPPER.writeTree(jsonGenerator, node); | |
} catch (Exception e) { | |
throw new RuntimeException("Could not encode JSON value"); | |
} | |
return writer.toString(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment