Created
February 24, 2016 18:32
-
-
Save jziub/301ca614491a601dfb03 to your computer and use it in GitHub Desktop.
couchbase v2.2.3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.yahoo.ycsb.db; | |
import com.couchbase.client.java.Bucket; | |
import com.couchbase.client.java.Cluster; | |
import com.couchbase.client.java.CouchbaseCluster; | |
import com.couchbase.client.java.PersistTo; | |
import com.couchbase.client.java.ReplicateTo; | |
import com.couchbase.client.java.document.JsonDocument; | |
import com.couchbase.client.java.document.json.JsonObject; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.Properties; | |
import java.util.Set; | |
import java.util.Vector; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.yahoo.ycsb.ByteIterator; | |
import com.yahoo.ycsb.DB; | |
import com.yahoo.ycsb.DBException; | |
import com.yahoo.ycsb.Status; | |
import com.yahoo.ycsb.StringByteIterator; | |
public class CouchbaseClient2 extends DB { | |
public static final String URL_PROPERTY = "couchbase.url"; | |
public static final String BUCKET_PROPERTY = "couchbase.bucket"; | |
public static final String PASSWORD_PROPERTY = "couchbase.password"; | |
public static final String CHECKF_PROPERTY = "couchbase.checkFutures"; | |
public static final String PERSIST_PROPERTY = "couchbase.persistTo"; | |
public static final String REPLICATE_PROPERTY = "couchbase.replicateTo"; | |
public static final String JSON_PROPERTY = "couchbase.json"; | |
private static Cluster cluster = null; | |
private static AtomicInteger inUsed = new AtomicInteger(0); | |
private static Bucket bucketObj = null; | |
private PersistTo persistTo; | |
private ReplicateTo replicateTo; | |
private final Logger log = LoggerFactory.getLogger(getClass()); | |
@Override | |
public void init() throws DBException { | |
Properties props = getProperties(); | |
String url = props.getProperty(URL_PROPERTY, "127.0.0.1"); | |
String bucket = props.getProperty(BUCKET_PROPERTY, "default"); | |
String password = props.getProperty(PASSWORD_PROPERTY, ""); | |
persistTo = parsePersistTo(props.getProperty(PERSIST_PROPERTY, "0")); | |
replicateTo = parseReplicateTo(props.getProperty(REPLICATE_PROPERTY, "0")); | |
Properties systemProperties = System.getProperties(); | |
systemProperties.put("net.spy.log.LoggerImpl", "net.spy.memcached.compat.log.SLF4JLogger"); | |
System.setProperties(systemProperties); | |
try { | |
CouchbaseClient2.openBucket(url, bucket, password); | |
} catch (Exception e) { | |
throw new DBException("Could not create CouchbaseClient object.", e); | |
} | |
} | |
private static synchronized void openBucket(String host, String bucketName, String password) { | |
if (cluster == null) { | |
cluster = CouchbaseCluster.create(host); | |
bucketObj = cluster.openBucket(bucketName, password); | |
} | |
inUsed.incrementAndGet(); | |
} | |
/** atomic operation */ | |
private static synchronized void closeClusterInstance() { | |
int count = CouchbaseClient2.inUsed.decrementAndGet(); | |
if (count == 0) { | |
System.out.println("!!!!!! close everything "); | |
bucketObj.close(); | |
cluster.disconnect(); | |
} | |
} | |
/** | |
* Parse the replicate property into the correct enum. | |
* | |
* @param property | |
* the stringified property value. | |
* @throws DBException | |
* if parsing the property did fail. | |
* @return the correct enum. | |
*/ | |
private ReplicateTo parseReplicateTo(final String property) throws DBException { | |
int value = Integer.parseInt(property); | |
switch (value) { | |
case 0: | |
return ReplicateTo.NONE; | |
case 1: | |
return ReplicateTo.ONE; | |
case 2: | |
return ReplicateTo.TWO; | |
case 3: | |
return ReplicateTo.THREE; | |
default: | |
throw new DBException(REPLICATE_PROPERTY + " must be between 0 and 3"); | |
} | |
} | |
/** | |
* Parse the persist property into the correct enum. | |
* | |
* @param property | |
* the stringified property value. | |
* @throws DBException | |
* if parsing the property did fail. | |
* @return the correct enum. | |
*/ | |
private PersistTo parsePersistTo(final String property) throws DBException { | |
int value = Integer.parseInt(property); | |
switch (value) { | |
case 0: | |
return PersistTo.NONE; | |
case 1: | |
return PersistTo.ONE; | |
case 2: | |
return PersistTo.TWO; | |
case 3: | |
return PersistTo.THREE; | |
case 4: | |
return PersistTo.FOUR; | |
default: | |
throw new DBException(PERSIST_PROPERTY + " must be between 0 and 4"); | |
} | |
} | |
/** | |
* Prefix the key with the given prefix, to establish a unique namespace. | |
* | |
* @param prefix | |
* the prefix to use. | |
* @param key | |
* the actual key. | |
* @return the formatted and prefixed key. | |
*/ | |
private String formatKey(final String prefix, final String key) { | |
return prefix + ":" + key; | |
} | |
/** | |
* Shutdown the client. | |
*/ | |
@Override | |
public void cleanup() { | |
if (bucketObj != null) { | |
bucketObj.close(); | |
} | |
CouchbaseClient2.closeClusterInstance(); | |
} | |
@Override | |
public Status read(final String table, final String key, final Set<String> fields, | |
final HashMap<String, ByteIterator> result) { | |
String formattedKey = formatKey(table, key); | |
try { | |
JsonDocument loaded = bucketObj.get(formattedKey); | |
if (loaded == null) { | |
return Status.ERROR; | |
} else { | |
decode(loaded, fields, result); | |
return Status.OK; | |
} | |
} catch (Exception e) { | |
if (log.isErrorEnabled()) { | |
log.error("Could not read value for key " + formattedKey, e); | |
} | |
return Status.ERROR; | |
} | |
} | |
/** | |
* Scan is currently not implemented. | |
* | |
* @param table | |
* The name of the table | |
* @param startkey | |
* The record key of the first record to read. | |
* @param recordcount | |
* The number of records to read | |
* @param fields | |
* The list of fields to read, or null for all of them | |
* @param result | |
* A Vector of HashMaps, where each HashMap is a set field/value | |
* pairs for one record | |
* @return Status.ERROR, because not implemented yet. | |
*/ | |
@Override | |
public Status scan(final String table, final String startkey, final int recordcount, final Set<String> fields, | |
final Vector<HashMap<String, ByteIterator>> result) { | |
return Status.ERROR; | |
} | |
@Override | |
public Status update(final String table, final String key, final HashMap<String, ByteIterator> values) { | |
String formattedKey = formatKey(table, key); | |
try { | |
JsonDocument doc = JsonDocument.create(formattedKey, encode(values)); | |
bucketObj.upsert(doc, persistTo, replicateTo); | |
return Status.OK; | |
} catch (Exception e) { | |
if (log.isErrorEnabled()) { | |
log.error("Could not insert value for key " + formattedKey, e); | |
} | |
return Status.ERROR; | |
} | |
} | |
@Override | |
public Status insert(final String table, final String key, final HashMap<String, ByteIterator> values) { | |
String formattedKey = formatKey(table, key); | |
try { | |
JsonDocument doc = JsonDocument.create(formattedKey, encode(values)); | |
bucketObj.insert(doc, persistTo, replicateTo); | |
return Status.OK; | |
} catch (Exception e) { | |
if (log.isErrorEnabled()) { | |
// log.error("Could not insert value for key " + formattedKey, e); | |
log.error(e.getMessage(), e); | |
} | |
return Status.ERROR; | |
} | |
} | |
@Override | |
public Status delete(final String table, final String key) { | |
String formattedKey = formatKey(table, key); | |
try { | |
bucketObj.remove(formattedKey, persistTo, replicateTo); | |
return Status.OK; | |
} catch (Exception e) { | |
if (log.isErrorEnabled()) { | |
log.error("Could not delete value for key " + formattedKey, e); | |
} | |
return Status.ERROR; | |
} | |
} | |
/** | |
* Decode the object from server into the storable result. | |
* | |
* @param source | |
* the loaded object. | |
* @param fields | |
* the fields to check. | |
* @param dest | |
* the result passed back to the ycsb core. | |
*/ | |
private void decode(final JsonDocument source, final Set<String> fields, | |
final HashMap<String, ByteIterator> dest) { | |
Map<String, Object> map = source.content().toMap(); | |
fields.addAll(map.keySet()); | |
for (Map.Entry<String, Object> entry : map.entrySet()) { | |
dest.put(entry.getKey(), new StringByteIterator(entry.getValue().toString())); | |
} | |
} | |
/** | |
* Encode the object for couchbase storage. | |
* | |
* @param source | |
* the source value. | |
* @return the storable object. | |
*/ | |
private JsonObject encode(final HashMap<String, ByteIterator> source) { | |
JsonObject content = JsonObject.empty(); | |
HashMap<String, String> stringMap = StringByteIterator.getStringMap(source); | |
for (Map.Entry<String, String> pair : stringMap.entrySet()) { | |
content.put(pair.getKey(), pair.getValue()); | |
} | |
return content; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment