Skip to content

Instantly share code, notes, and snippets.

@boristyukin
Created November 16, 2018 21:06
Show Gist options
  • Save boristyukin/8703d2c6ec55d6787843aa133920bf01 to your computer and use it in GitHub Desktop.
Save boristyukin/8703d2c6ec55d6787843aa133920bf01 to your computer and use it in GitHub Desktop.
kudu getPendingErrors issue
import org.apache.kudu.ColumnSchema
import org.apache.kudu.Schema
import org.apache.kudu.Type
import org.apache.kudu.client.AlterTableOptions
import org.apache.kudu.client.CreateTableOptions
import org.apache.kudu.client.Insert
import org.apache.kudu.client.KuduClient
import org.apache.kudu.client.KuduException
import org.apache.kudu.client.KuduPredicate
import org.apache.kudu.client.KuduPredicate.ComparisonOp
import org.apache.kudu.client.KuduScanner
import org.apache.kudu.client.KuduSession
import org.apache.kudu.client.KuduTable
import org.apache.kudu.client.PartialRow
import org.apache.kudu.client.RowResult
import org.apache.kudu.client.RowResultIterator
import org.apache.kudu.client.OperationResponse
import org.apache.kudu.client.SessionConfiguration
import java.time.Instant
def tableName = "kudu_groovy_example"
def KUDU_MASTERS = "localhost:7051"
void createExampleTable(KuduClient client, String tableName) {
// Set up a simple schema.
def columns = [
new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build(),
new ColumnSchema.ColumnSchemaBuilder("value", Type.STRING).nullable(true).build(),
new ColumnSchema.ColumnSchemaBuilder("dt_tm", Type.UNIXTIME_MICROS).nullable(false).build()
]
def schema = new Schema(columns)
// Set up the partition schema, which distributes rows to different tablets by hash.
// Kudu also supports partitioning by key range. Hash and range partitioning can be combined.
// For more information, see http://kudu.apache.org/docs/schema_design.html.
CreateTableOptions cto = new CreateTableOptions()
int numBuckets = 8
cto.addHashPartitions(["key"], numBuckets)
// Create the table.
client.createTable(tableName, schema, cto)
println("Created table " + tableName)
}
void insertRows(KuduClient client, String tableName, int numRows) throws KuduException {
// Open the newly-created table and create a KuduSession.
start = Instant.now().toEpochMilli()
KuduTable table = client.openTable(tableName)
KuduSession session = client.newSession()
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND)
println("Inserting $numRows rows in ${session.getFlushMode()} flush mode ...")
for (i in 1..numRows) {
def insert = table.newInsert()
def row = insert.getRow()
row.addInt("key", i)
// Make even-keyed row have a null 'value'.
if (i % 2 == 0) {
row.setNull("value")
}
else {
row.addString("value", "value " + i)
}
// fake a error row:
if (!(i==2)) {
row.addLong("dt_tm", System.currentTimeMillis() * 1000)
}
println(row.toString())
response = session.apply(insert)
// session.flush()
// response will be NULL for AUTO_FLUSH_BACKGROUND
// if (response && response.hasRowError()) {
// println(response.rowError.toString())
// }
}
// Call session.close() to end the session and ensure the rows are
// flushed and errors are returned.
// You can also call session.flush() to do the same without ending the session.
// When flushing in AUTO_FLUSH_BACKGROUND mode (the default mode recommended
// for most workloads, you must check the pending errors as shown below, since
// write operations are flushed to Kudu in background threads.
session.close()
if (session.countPendingErrors() != 0) {
println("${session.countPendingErrors()} errors inserting rows")
roStatus = session.getPendingErrors()
errs = roStatus.getRowErrors()
numErrs = Math.min(errs.length, 10)
println("there were errors inserting rows to Kudu")
println("the first few errors follow:")
for (int i = 0; i < numErrs; i++) {
println(errs[i])
}
if (roStatus.isOverflowed()) {
println("error buffer overflowed: some errors were discarded")
}
throw new RuntimeException("error inserting rows to Kudu");
}
// session.flush()
duration = Instant.now().toEpochMilli() - start
println("$numRows rows inserted in $duration ms")
}
long countRows(KuduClient client, String tableName) throws KuduException {
start = Instant.now().toEpochMilli()
table = client.openTable(tableName)
schema = table.getSchema()
int counter = 0
scanBuilder = table.getAsyncClient().syncClient().newScannerBuilder(table)
// for (KuduPredicate predicate : predicates) {
// scanBuilder.addPredicate(predicate)
// }
scanBuilder.setProjectedColumnIndexes([0])
scanner = scanBuilder.build()
while (scanner.hasMoreRows()) {
counter += scanner.nextRows().getNumRows()
}
duration = Instant.now().toEpochMilli() - start
println("Rows counted in $duration ms")
return counter
}
void scanTableToStrings(KuduClient client, String tableName) throws Exception {
table = client.openTable(tableName)
rowStrings = []
scanner = table.getAsyncClient().syncClient().newScannerBuilder(table).build()
while (scanner.hasMoreRows()) {
rows = scanner.nextRows()
for (RowResult r : rows) {
rowStrings.add(r.rowToString())
}
}
for (row in rowStrings) {
println(row)
}
// Collections.sort(rowStrings);
// return rowStrings
}
// Init Kudu client
start = Instant.now().toEpochMilli()
KuduClient client = new KuduClient.KuduClientBuilder(KUDU_MASTERS).build()
duration = Instant.now().toEpochMilli() - start
println("Client connected in $duration ms")
// Get a list of Kudu tables
//client.getTablesList().getTablesList().each {table ->
// println(table)
//}
if (client.tableExists(tableName)) {
client.deleteTable(tableName)
}
createExampleTable(client, tableName)
try {
insertRows(client,tableName,10)}
catch (e)
{
}
println("Table has ${countRows(client, tableName)} rows")
scanTableToStrings(client, tableName)
client.shutdown()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment