Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Simple Key-Value Java Client for Riak
public class RiakKVClient
{
/**
* Constructor. Use RiakProvider.getStoreClient(name) instead.
* @param name bucket name
* @param client low-level Riak Java client instance
*/
protected RiakKVClient(final String name, final com.basho.riak.client.api.RiakClient client)
{
this.name = name;
this.namespace = new Namespace(name);
this.client = client;
}
/**
* Get a value associated with a key
* @param key
* @return the value associated with the given key
* @throws IOException
*/
public byte[] get(final String key) throws IOException {
if (key == null)
{
throw new IllegalArgumentException("Key is required");
}
try {
final FetchValue.Response response = fetchValue(key);
if (response.isNotFound())
{
return null;
}
final RiakObject riakObject = response.getValue(RiakObject.class);
return riakObject.getValue().getValue();
} catch (ExecutionException e) {
throw new IOException("Riak failed to retrieve object from bucket: " + name + " with key: " + key, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return null;
}
/**
* Insert a value associated with a key. If a value already exists, update it.
* @param key the key
* @param value the value to store
* @throws IOException
*/
public void put(final String key, final byte[] value) throws IOException {
if (key == null || value == null) {
throw new IllegalArgumentException("All parameters are required");
}
try {
// fetch in order to get the causal context
final FetchValue.Response response = fetchMetadata(key);
final RiakObject storeObject = new RiakObject().setValue(BinaryValue.create(value)).setContentType("binary/octet-stream");
StoreValue.Builder builder = new StoreValue.Builder(storeObject).withLocation(new Location(namespace, key));
final VClock vectorClock = response.getVectorClock();
if (vectorClock != null) {
builder = builder.withVectorClock(vectorClock);
}
final StoreValue storeValue = builder.build();
client.execute(storeValue);
} catch (ExecutionException e) {
throw new IOException("Riak failed to store object in bucket: " + name + " with key: " + key, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
/**
* Delete the value associated with the given key.
* @param key the key to delete
* @returns true if the value existed; false if o/w (and method will have no effect)
* @throws IOException
*/
public boolean delete(final String key) throws IOException {
if (key == null)
{
throw new IllegalArgumentException("Key is required");
}
try {
// fetch in order to get the causal context
final FetchValue.Response response = fetchMetadata(key);
if (response.isNotFound())
{
return false;
}
DeleteValue.Builder builder = new DeleteValue.Builder(new Location(namespace, key));
final VClock vectorClock = response.getVectorClock();
if (vectorClock != null) {
builder = builder.withVClock(vectorClock);
}
final DeleteValue deleteValue = builder.build();
client.execute(deleteValue);
return !response.isNotFound() || !response.hasValues();
} catch (ExecutionException e) {
throw new IOException("Riak failed to store object in bucket: " + name + " with key: " + key, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return false;
}
private FetchValue.Response fetchMetadata(final String key) throws ExecutionException, InterruptedException
{
return fetchResponse(key, true);
}
private FetchValue.Response fetchValue(final String key) throws ExecutionException, InterruptedException
{
return fetchResponse(key, false);
}
private FetchValue.Response fetchResponse(final String key, boolean headOnly) throws ExecutionException, InterruptedException {
Location loc = new Location(namespace, key);
FetchValue.Builder builder = new FetchValue.Builder(loc);
if (headOnly) {
builder.withOption(FetchValue.Option.HEAD, true);
}
FetchValue fetch = builder.build();
return client.execute(fetch);
}
private final Namespace namespace;
private final String name;
private final com.basho.riak.client.api.RiakClient client;
}
public class RiakProvider
{
/**
* Constructor.
*
* @param host host for the client to connect to--usually the local one; must match Riak riak.config
* @param port port fot the client to connect to; must match Riak riak.config
* @throws UnknownHostException if the host is not found
*/
public RiakProvider(String host, String port) throws UnknownHostException
{
if (host.equals("-1"))
{
throw new IllegalArgumentException("Invalid host provided: " + host);
}
int portNum;
try {
portNum = Integer.parseInt(port);
}
catch (NumberFormatException e)
{
throw new IllegalArgumentException("Port must be a number");
}
this.riakClient = com.basho.riak.client.api.RiakClient.newClient(portNum, host);
}
public RiakKVClient<String, byte[]> getStoreClient(String name)
{
if (name == null)
{
throw new IllegalArgumentException("Name is required");
}
final RiakKVClient newClient = new RiakKVClient(name, riakClient);
final RiakKVClient existingClient = clientMap.putIfAbsent(name, newClient);
if (existingClient != null)
{
return existingClient;
}
else
{
final Namespace namespace = new Namespace(name);
final FetchBucketProperties fetchProps = new FetchBucketProperties.Builder(namespace).build();
FetchBucketPropsOperation.Response fetchResponse = null;
try
{
fetchResponse = riakClient.execute(fetchProps);
// fetching the bucket properties creates the bucket with default properties if it doesn't already exist
BucketProperties bp = fetchResponse.getBucketProperties();
return newClient;
}
catch (ExecutionException e)
{
throw new IllegalStateException("Riak is not available or not reachable", e);
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
}
}
return null;
}
public void destroy() {
riakClient.shutdown();
}
private final com.basho.riak.client.api.RiakClient riakClient;
private final ConcurrentHashMap<String, RiakKVClient> clientMap = new ConcurrentHashMap<>();
}
@fridgebuzz

This comment has been minimized.

Copy link

commented Mar 2, 2016

I should have mentioned that RiakProvider is meant to be a singleton. I use Spring to create a singleton bean, you can convert it into a bona fide singleton without much work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.