Skip to content

Instantly share code, notes, and snippets.

@broach
Created May 8, 2013 23:34
Show Gist options
  • Save broach/5544483 to your computer and use it in GitHub Desktop.
Save broach/5544483 to your computer and use it in GitHub Desktop.
Multi-threaded store example
import com.basho.riak.client.IRiakClient;
import com.basho.riak.client.IRiakObject;
import com.basho.riak.client.RiakException;
import com.basho.riak.client.RiakFactory;
import com.basho.riak.client.bucket.Bucket;
import com.basho.riak.client.operations.StoreObject;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MultithreadStoreDemo
{
private static final int DEFAULT_POOL_MAX_SIZE = Runtime.getRuntime().availableProcessors() * 2;
private static final LinkedBlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<Runnable>();
private static final ThreadPoolExecutor threadPool =
new ThreadPoolExecutor(DEFAULT_POOL_MAX_SIZE, DEFAULT_POOL_MAX_SIZE, 5, TimeUnit.SECONDS, workQueue);
public static void main(String[] args) throws RiakException, InterruptedException
{
byte[] data = new byte[1024];
Arrays.fill(data, (byte)1);
IRiakClient client = RiakFactory.pbcClient();
Bucket b = client.fetchBucket("test_bucket").execute();
List<FutureTask<IRiakObject>> futureList = new ArrayList<FutureTask<IRiakObject>>(100);
for (int i = 0; i < 100; i++)
{
MultiStoreCallable<IRiakObject> msc = new MultiStoreCallable<IRiakObject>(b.store(String.valueOf(i), data));
FutureTask<IRiakObject> ft = new FutureTask<IRiakObject>(msc);
futureList.add(ft);
threadPool.execute(ft);
}
int count = 0;
for (FutureTask<IRiakObject> task : futureList)
{
count++;
try
{
// Simply wait for the future to complete by calling get().
System.out.println(count + ": " + task.get());
}
catch (ExecutionException ex)
{
System.out.println(ex.getCause().getMessage());
}
}
threadPool.shutdown();
client.shutdown();
}
static class MultiStoreCallable<T> implements Callable<T>
{
private final StoreObject<T> storeObject;
public MultiStoreCallable(StoreObject<T> storeObject)
{
this.storeObject = storeObject;
}
public T call() throws Exception
{
return storeObject.execute();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment