Skip to content

Instantly share code, notes, and snippets.

@xkrt
Created April 24, 2017 11:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save xkrt/9405a2eeb98a56288b7c5a7d817097b4 to your computer and use it in GitHub Desktop.
Save xkrt/9405a2eeb98a56288b7c5a7d817097b4 to your computer and use it in GitHub Desktop.
Kudu java InsertLoadgen issue
package org.kududb.examples.loadgen;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
public class InsertLoadgen {
private static class RandomDataGenerator {
private final Random rng;
private final int index;
private final Type type;
/**
* Instantiate a random data generator for a specific field.
* @param index The numerical index of the column in the row schema
* @param type The type of the data at index {@code index}
*/
public RandomDataGenerator(int index, Type type) {
this.rng = new Random();
this.index = index;
this.type = type;
}
/**
* Add random data to the given row for the column at index {@code index}
* of type {@code type}
* @param row The row to add the field to
*/
void generateColumnData(PartialRow row) {
switch (type) {
case INT8:
row.addByte(index, (byte) rng.nextInt(Byte.MAX_VALUE));
return;
case INT16:
row.addShort(index, (short)rng.nextInt(Short.MAX_VALUE));
return;
case INT32:
row.addInt(index, rng.nextInt(Integer.MAX_VALUE));
return;
case INT64:
// todo replace UNIXTIME_MICROS with for kudu-client 0.10
//case TIMESTAMP:
case UNIXTIME_MICROS:
row.addLong(index, rng.nextLong());
return;
case BINARY:
byte bytes[] = new byte[16];
rng.nextBytes(bytes);
row.addBinary(index, bytes);
return;
case STRING:
row.addString(index, UUID.randomUUID().toString());
return;
case BOOL:
row.addBoolean(index, rng.nextBoolean());
return;
case FLOAT:
row.addFloat(index, rng.nextFloat());
return;
case DOUBLE:
row.addDouble(index, rng.nextDouble());
return;
default:
throw new UnsupportedOperationException("Unknown type " + type);
}
}
}
public static void runLoad(String masterHost, String tableName) {
KuduClient client = new KuduClient.KuduClientBuilder(masterHost).build();
int count = (int)50e3;
long startMs = 0;
try {
KuduTable table = client.openTable(tableName);
Schema schema = table.getSchema();
List<RandomDataGenerator> generators = new ArrayList<>(schema.getColumnCount());
for (int i = 0; i < schema.getColumnCount(); i++) {
generators.add(new RandomDataGenerator(i, schema.getColumnByIndex(i).getType()));
}
KuduSession session = client.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
startMs = System.currentTimeMillis();
for (int j = 0; j < count; ++j) {
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
for (int i = 0; i < schema.getColumnCount(); i++) {
generators.get(i).generateColumnData(row);
}
session.apply(insert);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
client.shutdown();
long endMs = System.currentTimeMillis();
long deltaMs = endMs - startMs;
System.out.println("Duration: " + deltaMs + " ms, " + ((int)((float)count/deltaMs*1000)) + " inserts/sec");
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
if (args.length != 3) {
System.err.println("Usage: InsertLoadgen kudu_master_host kudu_table num_threads");
System.exit(1);
}
final String masterHost = args[0];
final String tableName = args[1];
int numThreads = Integer.parseInt(args[2]);
// table schema taken from https://github.com/cloudera/kudu-examples/blob/master/java/java-sample/src/main/java/org/kududb/examples/sample/Sample.java
KuduClient client = new KuduClient.KuduClientBuilder(masterHost).build();
if (client.tableExists(tableName))
client.deleteTable(tableName);
List<ColumnSchema> columns = new ArrayList(2);
columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32)
.key(true)
.build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING)
.build());
List<String> rangeKeys = new ArrayList<>();
rangeKeys.add("key");
Schema schema = new Schema(columns);
client.createTable(tableName, schema, new CreateTableOptions().setRangePartitionColumns(rangeKeys));
final CountDownLatch latch = new CountDownLatch(numThreads);
List<Thread> threads = new ArrayList<>(numThreads);
for (int i = 0; i < numThreads; i++) {
threads.add(new Thread(new Runnable() {
public void run() {
runLoad(masterHost, tableName);
latch.countDown();
}
}));
threads.get(i).start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
kudu-client-ver perf
0.10 Duration: 626 ms, 79872/sec
1.0.0 Duration: 622 ms, 80385 inserts/sec
1.0.1 Duration: 630 ms, 79365 inserts/sec
1.1.0 Duration: 11703 ms, 4272 inserts/sec
1.3.1 Duration: 12317 ms, 4059 inserts/sec
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment