Created
April 24, 2017 11:33
-
-
Save xkrt/9405a2eeb98a56288b7c5a7d817097b4 to your computer and use it in GitHub Desktop.
Kudu java InsertLoadgen issue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.kududb.examples.loadgen; | |
import org.apache.kudu.ColumnSchema; | |
import org.apache.kudu.Schema; | |
import org.apache.kudu.Type; | |
import org.apache.kudu.client.*; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.Random; | |
import java.util.UUID; | |
import java.util.concurrent.CountDownLatch; | |
public class InsertLoadgen { | |
private static class RandomDataGenerator { | |
private final Random rng; | |
private final int index; | |
private final Type type; | |
/** | |
* Instantiate a random data generator for a specific field. | |
* @param index The numerical index of the column in the row schema | |
* @param type The type of the data at index {@code index} | |
*/ | |
public RandomDataGenerator(int index, Type type) { | |
this.rng = new Random(); | |
this.index = index; | |
this.type = type; | |
} | |
/** | |
* Add random data to the given row for the column at index {@code index} | |
* of type {@code type} | |
* @param row The row to add the field to | |
*/ | |
void generateColumnData(PartialRow row) { | |
switch (type) { | |
case INT8: | |
row.addByte(index, (byte) rng.nextInt(Byte.MAX_VALUE)); | |
return; | |
case INT16: | |
row.addShort(index, (short)rng.nextInt(Short.MAX_VALUE)); | |
return; | |
case INT32: | |
row.addInt(index, rng.nextInt(Integer.MAX_VALUE)); | |
return; | |
case INT64: | |
// todo replace UNIXTIME_MICROS with for kudu-client 0.10 | |
//case TIMESTAMP: | |
case UNIXTIME_MICROS: | |
row.addLong(index, rng.nextLong()); | |
return; | |
case BINARY: | |
byte bytes[] = new byte[16]; | |
rng.nextBytes(bytes); | |
row.addBinary(index, bytes); | |
return; | |
case STRING: | |
row.addString(index, UUID.randomUUID().toString()); | |
return; | |
case BOOL: | |
row.addBoolean(index, rng.nextBoolean()); | |
return; | |
case FLOAT: | |
row.addFloat(index, rng.nextFloat()); | |
return; | |
case DOUBLE: | |
row.addDouble(index, rng.nextDouble()); | |
return; | |
default: | |
throw new UnsupportedOperationException("Unknown type " + type); | |
} | |
} | |
} | |
public static void runLoad(String masterHost, String tableName) { | |
KuduClient client = new KuduClient.KuduClientBuilder(masterHost).build(); | |
int count = (int)50e3; | |
long startMs = 0; | |
try { | |
KuduTable table = client.openTable(tableName); | |
Schema schema = table.getSchema(); | |
List<RandomDataGenerator> generators = new ArrayList<>(schema.getColumnCount()); | |
for (int i = 0; i < schema.getColumnCount(); i++) { | |
generators.add(new RandomDataGenerator(i, schema.getColumnByIndex(i).getType())); | |
} | |
KuduSession session = client.newSession(); | |
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND); | |
startMs = System.currentTimeMillis(); | |
for (int j = 0; j < count; ++j) { | |
Insert insert = table.newInsert(); | |
PartialRow row = insert.getRow(); | |
for (int i = 0; i < schema.getColumnCount(); i++) { | |
generators.get(i).generateColumnData(row); | |
} | |
session.apply(insert); | |
} | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} finally { | |
try { | |
client.shutdown(); | |
long endMs = System.currentTimeMillis(); | |
long deltaMs = endMs - startMs; | |
System.out.println("Duration: " + deltaMs + " ms, " + ((int)((float)count/deltaMs*1000)) + " inserts/sec"); | |
} catch (Exception e) { | |
e.printStackTrace(); | |
} | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
if (args.length != 3) { | |
System.err.println("Usage: InsertLoadgen kudu_master_host kudu_table num_threads"); | |
System.exit(1); | |
} | |
final String masterHost = args[0]; | |
final String tableName = args[1]; | |
int numThreads = Integer.parseInt(args[2]); | |
// table schema taken from https://github.com/cloudera/kudu-examples/blob/master/java/java-sample/src/main/java/org/kududb/examples/sample/Sample.java | |
KuduClient client = new KuduClient.KuduClientBuilder(masterHost).build(); | |
if (client.tableExists(tableName)) | |
client.deleteTable(tableName); | |
List<ColumnSchema> columns = new ArrayList(2); | |
columns.add(new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32) | |
.key(true) | |
.build()); | |
columns.add(new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING) | |
.build()); | |
List<String> rangeKeys = new ArrayList<>(); | |
rangeKeys.add("key"); | |
Schema schema = new Schema(columns); | |
client.createTable(tableName, schema, new CreateTableOptions().setRangePartitionColumns(rangeKeys)); | |
final CountDownLatch latch = new CountDownLatch(numThreads); | |
List<Thread> threads = new ArrayList<>(numThreads); | |
for (int i = 0; i < numThreads; i++) { | |
threads.add(new Thread(new Runnable() { | |
public void run() { | |
runLoad(masterHost, tableName); | |
latch.countDown(); | |
} | |
})); | |
threads.get(i).start(); | |
} | |
try { | |
latch.await(); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
kudu-client-ver perf | |
0.10 Duration: 626 ms, 79872/sec | |
1.0.0 Duration: 622 ms, 80385 inserts/sec | |
1.0.1 Duration: 630 ms, 79365 inserts/sec | |
1.1.0 Duration: 11703 ms, 4272 inserts/sec | |
1.3.1 Duration: 12317 ms, 4059 inserts/sec |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment