Skip to content

Instantly share code, notes, and snippets.

@ThoughtWire
Forked from fridgebuzz/RiakKVClient.java
Created February 23, 2016 18:18
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save ThoughtWire/7381e319af09fac830b0 to your computer and use it in GitHub Desktop.
Simple Key-Value Java Client for Riak
public class RiakKVClient
{
/**
* Constructor. Use RiakProvider.getStoreClient(name) instead.
* @param name bucket name
* @param client low-level Riak Java client instance
*/
protected RiakKVClient(final String name, final com.basho.riak.client.api.RiakClient client)
{
this.name = name;
this.namespace = new Namespace(name);
this.client = client;
}
/**
* Get a value associated with a key
* @param key
* @return the value associated with the given key
* @throws IOException
*/
public byte[] get(final String key) throws IOException {
if (key == null)
{
throw new IllegalArgumentException("Key is required");
}
try {
final FetchValue.Response response = fetchValue(key);
if (response.isNotFound())
{
return null;
}
final RiakObject riakObject = response.getValue(RiakObject.class);
return riakObject.getValue().getValue();
} catch (ExecutionException e) {
throw new IOException("Riak failed to retrieve object from bucket: " + name + " with key: " + key, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return null;
}
/**
* Insert a value associated with a key. If a value already exists, update it.
* @param key the key
* @param value the value to store
* @throws IOException
*/
public void put(final String key, final byte[] value) throws IOException {
if (key == null || value == null) {
throw new IllegalArgumentException("All parameters are required");
}
try {
// fetch in order to get the causal context
final FetchValue.Response response = fetchMetadata(key);
final RiakObject storeObject = new RiakObject().setValue(BinaryValue.create(value)).setContentType("binary/octet-stream");
StoreValue.Builder builder = new StoreValue.Builder(storeObject).withLocation(new Location(namespace, key));
final VClock vectorClock = response.getVectorClock();
if (vectorClock != null) {
builder = builder.withVectorClock(vectorClock);
}
final StoreValue storeValue = builder.build();
client.execute(storeValue);
} catch (ExecutionException e) {
throw new IOException("Riak failed to store object in bucket: " + name + " with key: " + key, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* Delete the value associated with the given key.
* @param key the key to delete
* @returns true if the value existed; false if o/w (and method will have no effect)
* @throws IOException
*/
public boolean delete(final String key) throws IOException {
if (key == null)
{
throw new IllegalArgumentException("Key is required");
}
try {
// fetch in order to get the causal context
final FetchValue.Response response = fetchMetadata(key);
if (response.isNotFound())
{
return false;
}
DeleteValue.Builder builder = new DeleteValue.Builder(new Location(namespace, key));
final VClock vectorClock = response.getVectorClock();
if (vectorClock != null) {
builder = builder.withVClock(vectorClock);
}
final DeleteValue deleteValue = builder.build();
client.execute(deleteValue);
return !response.isNotFound() || !response.hasValues();
} catch (ExecutionException e) {
throw new IOException("Riak failed to store object in bucket: " + name + " with key: " + key, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}
private FetchValue.Response fetchMetadata(final String key) throws ExecutionException, InterruptedException
{
return fetchResponse(key, true);
}
private FetchValue.Response fetchValue(final String key) throws ExecutionException, InterruptedException
{
return fetchResponse(key, false);
}
private FetchValue.Response fetchResponse(final String key, boolean headOnly) throws ExecutionException, InterruptedException {
Location loc = new Location(namespace, key);
FetchValue.Builder builder = new FetchValue.Builder(loc);
if (headOnly) {
builder.withOption(FetchValue.Option.HEAD, true);
}
FetchValue fetch = builder.build();
return client.execute(fetch);
}
private final Namespace namespace;
private final String name;
private final com.basho.riak.client.api.RiakClient client;
}
public class RiakProvider
{
/**
* Constructor.
*
* @param host host for the client to connect to--usually the local one; must match Riak riak.config
* @param port port fot the client to connect to; must match Riak riak.config
* @throws UnknownHostException if the host is not found
*/
public RiakProvider(String host, String port) throws UnknownHostException
{
if (host.equals("-1"))
{
throw new IllegalArgumentException("Invalid host provided: " + host);
}
int portNum;
try {
portNum = Integer.parseInt(port);
}
catch (NumberFormatException e)
{
throw new IllegalArgumentException("Port must be a number");
}
this.riakClient = com.basho.riak.client.api.RiakClient.newClient(portNum, host);
}
public RiakKVClient<String, byte[]> getStoreClient(String name)
{
if (name == null)
{
throw new IllegalArgumentException("Name is required");
}
final RiakKVClient newClient = new RiakKVClient(name, riakClient);
final RiakKVClient existingClient = clientMap.putIfAbsent(name, newClient);
if (existingClient != null)
{
return existingClient;
}
else
{
final Namespace namespace = new Namespace(name);
final FetchBucketProperties fetchProps = new FetchBucketProperties.Builder(namespace).build();
FetchBucketPropsOperation.Response fetchResponse = null;
try
{
fetchResponse = riakClient.execute(fetchProps);
// fetching the bucket properties creates the bucket with default properties if it doesn't already exist
BucketProperties bp = fetchResponse.getBucketProperties();
return newClient;
}
catch (ExecutionException e)
{
throw new IllegalStateException("Riak is not available or not reachable", e);
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
}
}
return null;
}
public void destroy() {
riakClient.shutdown();
}
private final com.basho.riak.client.api.RiakClient riakClient;
private final ConcurrentHashMap<String, RiakKVClient> clientMap = new ConcurrentHashMap<>();
}
@fridgebuzz
Copy link

I should have mentioned that RiakProvider is meant to be a singleton. I use Spring to create a singleton bean, you can convert it into a bona fide singleton without much work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment