Created
February 23, 2016 17:17
-
-
Save fridgebuzz/00c6d4d645d172b111e9 to your computer and use it in GitHub Desktop.
Simple Key-Value Java Client for Riak
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class RiakKVClient | |
{ | |
/** | |
* Constructor. Use RiakProvider.getStoreClient(name) instead. | |
* @param name bucket name | |
* @param client low-level Riak Java client instance | |
*/ | |
protected RiakKVClient(final String name, final com.basho.riak.client.api.RiakClient client) | |
{ | |
this.name = name; | |
this.namespace = new Namespace(name); | |
this.client = client; | |
} | |
/** | |
* Get a value associated with a key | |
* @param key | |
* @return the value associated with the given key | |
* @throws IOException | |
*/ | |
public byte[] get(final String key) throws IOException { | |
if (key == null) | |
{ | |
throw new IllegalArgumentException("Key is required"); | |
} | |
try { | |
final FetchValue.Response response = fetchValue(key); | |
if (response.isNotFound()) | |
{ | |
return null; | |
} | |
final RiakObject riakObject = response.getValue(RiakObject.class); | |
return riakObject.getValue().getValue(); | |
} catch (ExecutionException e) { | |
throw new IOException("Riak failed to retrieve object from bucket: " + name + " with key: " + key, e); | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
} | |
return null; | |
} | |
/** | |
* Insert a value associated with a key. If a value already exists, update it. | |
* @param key the key | |
* @param value the value to store | |
* @throws IOException | |
*/ | |
public void put(final String key, final byte[] value) throws IOException { | |
if (key == null || value == null) { | |
throw new IllegalArgumentException("All parameters are required"); | |
} | |
try { | |
// fetch in order to get the causal context | |
final FetchValue.Response response = fetchMetadata(key); | |
final RiakObject storeObject = new RiakObject().setValue(BinaryValue.create(value)).setContentType("binary/octet-stream"); | |
StoreValue.Builder builder = new StoreValue.Builder(storeObject).withLocation(new Location(namespace, key)); | |
final VClock vectorClock = response.getVectorClock(); | |
if (vectorClock != null) { | |
builder = builder.withVectorClock(vectorClock); | |
} | |
final StoreValue storeValue = builder.build(); | |
client.execute(storeValue); | |
} catch (ExecutionException e) { | |
throw new IOException("Riak failed to store object in bucket: " + name + " with key: " + key, e); | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
} | |
} | |
/** | |
* Delete the value associated with the given key. | |
* @param key the key to delete | |
* @returns true if the value existed; false if o/w (and method will have no effect) | |
* @throws IOException | |
*/ | |
public boolean delete(final String key) throws IOException { | |
if (key == null) | |
{ | |
throw new IllegalArgumentException("Key is required"); | |
} | |
try { | |
// fetch in order to get the causal context | |
final FetchValue.Response response = fetchMetadata(key); | |
if (response.isNotFound()) | |
{ | |
return false; | |
} | |
DeleteValue.Builder builder = new DeleteValue.Builder(new Location(namespace, key)); | |
final VClock vectorClock = response.getVectorClock(); | |
if (vectorClock != null) { | |
builder = builder.withVClock(vectorClock); | |
} | |
final DeleteValue deleteValue = builder.build(); | |
client.execute(deleteValue); | |
return !response.isNotFound() || !response.hasValues(); | |
} catch (ExecutionException e) { | |
throw new IOException("Riak failed to store object in bucket: " + name + " with key: " + key, e); | |
} catch (InterruptedException e) { | |
Thread.currentThread().interrupt(); | |
} | |
return false; | |
} | |
private FetchValue.Response fetchMetadata(final String key) throws ExecutionException, InterruptedException | |
{ | |
return fetchResponse(key, true); | |
} | |
private FetchValue.Response fetchValue(final String key) throws ExecutionException, InterruptedException | |
{ | |
return fetchResponse(key, false); | |
} | |
private FetchValue.Response fetchResponse(final String key, boolean headOnly) throws ExecutionException, InterruptedException { | |
Location loc = new Location(namespace, key); | |
FetchValue.Builder builder = new FetchValue.Builder(loc); | |
if (headOnly) { | |
builder.withOption(FetchValue.Option.HEAD, true); | |
} | |
FetchValue fetch = builder.build(); | |
return client.execute(fetch); | |
} | |
private final Namespace namespace; | |
private final String name; | |
private final com.basho.riak.client.api.RiakClient client; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class RiakProvider | |
{ | |
/** | |
* Constructor. | |
* | |
* @param host host for the client to connect to--usually the local one; must match Riak riak.config | |
* @param port port fot the client to connect to; must match Riak riak.config | |
* @throws UnknownHostException if the host is not found | |
*/ | |
public RiakProvider(String host, String port) throws UnknownHostException | |
{ | |
if (host.equals("-1")) | |
{ | |
throw new IllegalArgumentException("Invalid host provided: " + host); | |
} | |
int portNum; | |
try { | |
portNum = Integer.parseInt(port); | |
} | |
catch (NumberFormatException e) | |
{ | |
throw new IllegalArgumentException("Port must be a number"); | |
} | |
this.riakClient = com.basho.riak.client.api.RiakClient.newClient(portNum, host); | |
} | |
public RiakKVClient<String, byte[]> getStoreClient(String name) | |
{ | |
if (name == null) | |
{ | |
throw new IllegalArgumentException("Name is required"); | |
} | |
final RiakKVClient newClient = new RiakKVClient(name, riakClient); | |
final RiakKVClient existingClient = clientMap.putIfAbsent(name, newClient); | |
if (existingClient != null) | |
{ | |
return existingClient; | |
} | |
else | |
{ | |
final Namespace namespace = new Namespace(name); | |
final FetchBucketProperties fetchProps = new FetchBucketProperties.Builder(namespace).build(); | |
FetchBucketPropsOperation.Response fetchResponse = null; | |
try | |
{ | |
fetchResponse = riakClient.execute(fetchProps); | |
// fetching the bucket properties creates the bucket with default properties if it doesn't already exist | |
BucketProperties bp = fetchResponse.getBucketProperties(); | |
return newClient; | |
} | |
catch (ExecutionException e) | |
{ | |
throw new IllegalStateException("Riak is not available or not reachable", e); | |
} | |
catch (InterruptedException e) | |
{ | |
Thread.currentThread().interrupt(); | |
} | |
} | |
return null; | |
} | |
public void destroy() { | |
riakClient.shutdown(); | |
} | |
private final com.basho.riak.client.api.RiakClient riakClient; | |
private final ConcurrentHashMap<String, RiakKVClient> clientMap = new ConcurrentHashMap<>(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment