Skip to content

Instantly share code, notes, and snippets.

@ksauzz
Created September 5, 2013 03:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ksauzz/6445675 to your computer and use it in GitHub Desktop.
Save ksauzz/6445675 to your computer and use it in GitHub Desktop.
riak-java-client v1.4.0 2i sample
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.List;
import com.basho.riak.client.IRiakClient;
import com.basho.riak.client.IRiakObject;
import com.basho.riak.client.IndexEntry;
import com.basho.riak.client.RiakFactory;
import com.basho.riak.client.bucket.Bucket;
import com.basho.riak.client.builders.RiakObjectBuilder;
import com.basho.riak.client.cap.VClock;
import com.basho.riak.client.convert.ConversionException;
import com.basho.riak.client.convert.Converter;
import com.basho.riak.client.convert.NoKeySpecifiedException;
import com.basho.riak.client.convert.RiakIndex;
import com.basho.riak.client.convert.RiakIndexConverter;
import com.basho.riak.client.convert.RiakKey;
import com.basho.riak.client.query.StreamingOperation;
import com.basho.riak.client.query.indexes.BinIndex;
import com.basho.riak.client.query.indexes.IntIndex;
import com.basho.riak.client.query.indexes.RiakIndexes;
import com.basho.riak.client.raw.config.Configuration;
import com.basho.riak.client.raw.http.HTTPClientConfig;
import com.basho.riak.client.raw.pbc.PBClientConfig;
public class Riak2iSample {
private static final int MAX_CONNECTION_SIZE = 10;
private static final int INIT_CONNECTION_SIZE = 10;
private static final int BUFFER_KB = 16;
private static final int IDLE_CONN_TIMEOUT_MIL = 2000;
private static final int CONNECTION_TIMEOUT_MIL = 2000;
private static final int REQUEST_TIMEOUT_MIL = 2000;
private static final String BUCKET_NAME = "person";
public static void main(String[] args) throws Exception {
// Configuration conf = PBClientConfig.defaults();
// Configuration conf = HTTPClientConfig.defaults();
Configuration conf = new PBClientConfig.Builder()
.withHost("127.0.0.1")
.withPort(8087)
.withConnectionTimeoutMillis(CONNECTION_TIMEOUT_MIL)
.withIdleConnectionTTLMillis(IDLE_CONN_TIMEOUT_MIL)
.withSocketBufferSizeKb(BUFFER_KB)
.withRequestTimeoutMillis(REQUEST_TIMEOUT_MIL)
.withInitialPoolSize(INIT_CONNECTION_SIZE)
.withPoolSize(MAX_CONNECTION_SIZE)
.build();
IRiakClient client = RiakFactory.newClient(conf);
Bucket bucket = client.createBucket(BUCKET_NAME).execute();
bucket.store(new Person("alice", "HR", 24)).withConverter(new PersonConverter()).execute();
bucket.store(new Person("bob", "Dev", 28)).withConverter(new PersonConverter()).execute();
bucket.store(new Person("clare", "HR", 30)).withConverter(new PersonConverter()).execute();
bucket.store(new Person("dizzy", "Dev", 33)).withConverter(new PersonConverter()).execute();
bucket.store(new Person("eric", "Dev", 27)).withConverter(new PersonConverter()).execute();
bucket.store(new Person("flora", "Dev", 27)).withConverter(new PersonConverter()).execute();
bucket.store(new Person("greg", "Board", 35)).withConverter(new PersonConverter()).execute();
// Dev divisionのみ取得
List<String> DevList = bucket.fetchIndex(BinIndex.named("division")).withValue("Dev").execute();
for (String key : DevList) {
Person person = new Person();
person.setName(key);
log("Dev: " + bucket.fetch(person).withConverter(new PersonConverter()).execute().toString());
}
// HR divisionのみ取得.
List<String> HRList = bucket.fetchIndex(BinIndex.named("division")).withValue("HR").execute();
for (String key : HRList) {
Person person = new Person();
person.setName(key);
log("HR: " + bucket.fetch(person).withConverter(new PersonConverter()).execute().toString());
}
// Range queryで特定範囲の値をもつkeyのみ取得
List<String> AgeList = bucket.fetchIndex(IntIndex.named("age")).from(27).to(29).execute();
for (String key : AgeList) {
Person person = new Person();
person.setName(key);
log("Range from 27 to 29: " + bucket.fetch(person).withConverter(new PersonConverter()).execute().toString());
}
// Streaming - Dev divisionのみ取得
StreamingOperation<IndexEntry> streamDev = bucket.fetchIndex(BinIndex.named("division")).withValue("Dev").executeStreaming();
for (IndexEntry index : streamDev) {
Person person = new Person();
person.setName(index.getObjectKey());
log("Streaming... Dev: " + bucket.fetch(person).withConverter(new PersonConverter()).execute().toString());
}
for (String key : bucket.keys()) {
bucket.delete(key).execute();
}
client.shutdown();
}
private static void log(String log) {
System.out.println(log);
}
public static class PersonConverter implements Converter<Person> {
private final RiakIndexConverter<Person> indexConverter = new RiakIndexConverter<Person>();
@Override
public IRiakObject fromDomain(Person domainObject, VClock vclock)
throws ConversionException {
if (domainObject.getName() == null) { throw new NoKeySpecifiedException(domainObject); }
byte[] value = serialize(domainObject);
return RiakObjectBuilder.newBuilder(BUCKET_NAME, domainObject.getName())
.withValue(value)
.withIndexes(indexConverter.getIndexes(domainObject))
.withVClock(vclock)
.build();
}
// 好みのシリアライザを使ってください。 msgpack, ProtocolBuffer, Kryo, thrift etc....
private byte[] serialize(Person domainObject) {
try {
ByteArrayOutputStream stream = new ByteArrayOutputStream();
new ObjectOutputStream(stream).writeObject(domainObject);
return stream.toByteArray();
} catch (IOException ie) {
throw new ConversionException(ie);
}
}
@Override
public Person toDomain(IRiakObject riakObject)
throws ConversionException {
try {
ByteArrayInputStream stream = new ByteArrayInputStream(riakObject.getValue());
return (Person) new ObjectInputStream(stream).readObject();
} catch (Exception e) {
throw new ConversionException(e);
}
}
}
public static class Person implements Serializable {
private static final long serialVersionUID = 1L;
@RiakKey
private String name;
@RiakIndex(name="division")
private transient String division; // Indexとしてのみ使う値はtransient
@RiakIndex(name="age")
private int age;
public Person(String name, String division, int age) {
this.name = name;
this.division = division;
this.age = age;
}
public Person() {}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getDivision() {
return division;
}
public void setDivision(String division) {
this.division = division;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return new StringBuilder()
.append("name:").append(name) .append(", ")
.append("age:").append(age)
.toString();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment