Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
One riak client shared by many threads.
/*
* This file is provided to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.basho.riak.client.itest;
import static org.junit.Assert.*;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Before;
import org.junit.Test;
import com.basho.riak.client.RiakClient;
import com.basho.riak.client.RiakConfig;
import com.basho.riak.client.RiakObject;
public class ITestDataLoad {
final String RIAK_URL = "http://127.0.0.1:8098/riak";
final String BUCKET = "test_data_load";
final int NUM_VALUES = 10;
final int VALUE_LENGTH = 512;
byte data[][] = new byte[NUM_VALUES][VALUE_LENGTH];
@Before public void setup() {
for (int i = 0; i < 10; i++) {
new Random().nextBytes(data[i]);
}
}
@Test public void concurrent_data_load() throws InterruptedException {
final int NUM_THREADS = 100;
final int NUM_OBJECTS = 2000;
final CountDownLatch startLatch = new CountDownLatch(1);
final CountDownLatch endLatch = new CountDownLatch(NUM_THREADS);
final Thread[] threads = new Thread[NUM_THREADS];
final AtomicInteger idx = new AtomicInteger(0);
final RiakConfig riakConfig = new RiakConfig(RIAK_URL);
riakConfig.setMaxConnections(100); // tune/play with this
final RiakClient riak = new RiakClient(riakConfig); // comment out and use per thread client
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(new Runnable() {
public void run() {
try {
startLatch.await();
// uncomment and compare RiakClient riak = new RiakClient(RIAK_URL);
Random rnd = new Random();
for (int i = 0; i < NUM_OBJECTS / NUM_THREADS; i++) {
String key = "data-load-" + idx.getAndIncrement();
String value = new String(data[rnd.nextInt(NUM_VALUES)]);
RiakObject o = riak.fetch(BUCKET, key).getObject();
if (o == null) {
o = new RiakObject(riak, BUCKET, key, value.getBytes());
} else {
o.setValue(value);
}
Utils.assertSuccess(o.store());
}
endLatch.countDown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
threads[i].start();
}
startLatch.countDown();
endLatch.await();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.