-
-
Save paulbarbu/50fc26d58a0d4b0f69deabd4fa0492ec to your computer and use it in GitHub Desktop.
JOOQ CcockroachDB test sample
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.cockroachlabs; | |
import com.cockroachlabs.example.jooq.db.Tables; | |
import com.cockroachlabs.example.jooq.db.tables.records.AccountsRecord; | |
import org.jooq.DSLContext; | |
import org.jooq.SQLDialect; | |
import org.jooq.Source; | |
import org.jooq.conf.RenderQuotedNames; | |
import org.jooq.conf.Settings; | |
import org.jooq.exception.DataAccessException; | |
import org.jooq.impl.DSL; | |
import org.jooq.TableRecord; | |
import java.io.InputStream; | |
import java.sql.Connection; | |
import java.sql.DriverManager; | |
import java.sql.SQLException; | |
import java.util.*; | |
import java.util.concurrent.atomic.AtomicInteger; | |
import java.util.concurrent.atomic.AtomicLong; | |
import java.util.function.Function; | |
import static com.cockroachlabs.example.jooq.db.Tables.ACCOUNTS; | |
public class Sample { | |
private static final Random RAND = new Random(); | |
private static final boolean FORCE_RETRY = false; | |
private static final String RETRY_SQL_STATE = "40001"; | |
private static final int MAX_ATTEMPT_COUNT = 6; | |
private static Function<DSLContext, Long> addAccounts() { | |
return ctx -> { | |
long rv = 0; | |
ctx.delete(ACCOUNTS).execute(); | |
List<AccountsRecord> r = new ArrayList<>(); | |
for(int i =0; i< 1000; i++) | |
{ | |
r.add(new AccountsRecord(UUID.randomUUID(), (long)i, 1000L)); | |
} | |
ctx.batchInsert(r).execute(); | |
rv = 1; | |
System.out.printf("APP: addAccounts() --> %d\n", rv); | |
return rv; | |
}; | |
} | |
private static Function<DSLContext, Long> transferFunds(long fromId, long toId, long amount) { | |
return ctx -> { | |
long rv = 0; | |
AccountsRecord fromAccount = ctx.fetchSingle(ACCOUNTS, ACCOUNTS.ID.eq(fromId)); | |
AccountsRecord toAccount = ctx.fetchSingle(ACCOUNTS, ACCOUNTS.ID.eq(toId)); | |
if (!(amount > fromAccount.getBalance())) { | |
fromAccount.setBalance(fromAccount.getBalance() - amount); | |
toAccount.setBalance(toAccount.getBalance() + amount); | |
ctx.batchUpdate(fromAccount, toAccount).execute(); | |
rv = amount; | |
System.out.printf("APP: transferFunds(%d, %d, %d) --> %d\n", fromId, toId, amount, rv); | |
} | |
return rv; | |
}; | |
} | |
// Test our retry handling logic if FORCE_RETRY is true. This | |
// method is only used to test the retry logic. It is not | |
// intended for production code. | |
private static Function<DSLContext, Long> forceRetryLogic() { | |
return ctx -> { | |
long rv = -1; | |
try { | |
System.out.printf("APP: testRetryLogic: BEFORE EXCEPTION\n"); | |
ctx.execute("SELECT crdb_internal.force_retry('1s')"); | |
} catch (DataAccessException e) { | |
System.out.printf("APP: testRetryLogic: AFTER EXCEPTION\n"); | |
throw e; | |
} | |
return rv; | |
}; | |
} | |
private static Function<DSLContext, Long> getAccountBalance(long id) { | |
return ctx -> { | |
AccountsRecord account = ctx.fetchSingle(ACCOUNTS, ACCOUNTS.ID.eq(id)); | |
long balance = account.getBalance(); | |
System.out.printf("APP: getAccountBalance(%d) --> %d\n", id, balance); | |
return balance; | |
}; | |
} | |
// Run SQL code in a way that automatically handles the | |
// transaction retry logic so we don't have to duplicate it in | |
// various places. | |
private static long runTransaction(DSLContext session, Function<DSLContext, Long> fn) { | |
AtomicLong rv = new AtomicLong(0L); | |
AtomicInteger attemptCount = new AtomicInteger(0); | |
while (attemptCount.get() < MAX_ATTEMPT_COUNT) { | |
attemptCount.incrementAndGet(); | |
if (attemptCount.get() > 1) { | |
System.out.printf("APP: Entering retry loop again, iteration %d\n", attemptCount.get()); | |
} | |
if (session.connectionResult(connection -> { | |
connection.setAutoCommit(false); | |
System.out.printf("APP: BEGIN;\n"); | |
if (attemptCount.get() == MAX_ATTEMPT_COUNT) { | |
String err = String.format("hit max of %s attempts, aborting", MAX_ATTEMPT_COUNT); | |
throw new RuntimeException(err); | |
} | |
// This block is only used to test the retry logic. | |
// It is not necessary in production code. See also | |
// the method 'testRetryLogic()'. | |
if (FORCE_RETRY) { | |
session.fetch("SELECT now()"); | |
} | |
try { | |
rv.set(fn.apply(session)); | |
if (rv.get() != -1) { | |
connection.commit(); | |
System.out.printf("APP: COMMIT;\n"); | |
return true; | |
} | |
} catch (DataAccessException | SQLException e) { | |
String sqlState = e instanceof SQLException ? ((SQLException) e).getSQLState() : ((DataAccessException) e).sqlState(); | |
if (RETRY_SQL_STATE.equals(sqlState)) { | |
// Since this is a transaction retry error, we | |
// roll back the transaction and sleep a little | |
// before trying again. Each time through the | |
// loop we sleep for a little longer than the last | |
// time (A.K.A. exponential backoff). | |
System.out.printf("APP: retryable exception occurred:\n sql state = [%s]\n message = [%s]\n retry counter = %s\n", sqlState, e.getMessage(), attemptCount.get()); | |
System.out.printf("APP: ROLLBACK;\n"); | |
connection.rollback(); | |
int sleepMillis = (int)(Math.pow(2, attemptCount.get()) * 100) + RAND.nextInt(100); | |
System.out.printf("APP: Hit 40001 transaction retry error, sleeping %s milliseconds\n", sleepMillis); | |
try { | |
Thread.sleep(sleepMillis); | |
} catch (InterruptedException ignored) { | |
// no-op | |
} | |
rv.set(-1L); | |
} else { | |
throw e; | |
} | |
} | |
return false; | |
})) { | |
break; | |
} | |
} | |
return rv.get(); | |
} | |
private final static float NANO2MS = 1000000; | |
private final static float NANO2S = NANO2MS*1000; | |
// CREATE USER foo WITH PASSWORD '***'; | |
// CREATE DATABASE bank; | |
// time cockroach sql --echo-sql --certs-dir=../distributed/certs --host=xyz -d bank < src/main/resources/db.sql | |
// GRANT ALL ON DATABASE bank TO foo; | |
// time mvn exec:java -Dexec.mainClass=com.cockroachlabs.Sample 2> times.txt | |
public static void main(String[] args) throws Exception { | |
long start = System.nanoTime(); | |
try (Connection connection = DriverManager.getConnection( | |
"jdbc:postgresql://xyz:26257/bank?sslmode=verify-full&sslrootcert=path/certs/ca.crt&sslcert=path/certs/client.foo.crt&sslkey=path/certs/client.foo.key.pk8&ApplicationName=jooq-sample-uuid-times", | |
"foo", | |
"***" | |
)) { | |
DSLContext ctx = DSL.using(connection, SQLDialect.COCKROACHDB, new Settings() | |
.withExecuteLogging(true) | |
.withRenderQuotedNames(RenderQuotedNames.NEVER)); | |
long end = System.nanoTime(); | |
System.err.println(String.format("init: %s ms (%s s)", (end-start)/NANO2MS, (end-start)/NANO2S)); | |
start = System.nanoTime(); | |
ctx.deleteFrom(ACCOUNTS).execute(); | |
end = System.nanoTime(); | |
System.err.println(String.format("delete: %s ms (%s s)", (end-start)/NANO2MS, (end-start)/NANO2S)); | |
long fromAccountId = 1; | |
long toAccountId = 2; | |
long transferAmount = 1; | |
if (FORCE_RETRY) { | |
System.out.printf("APP: About to test retry logic in 'runTransaction'\n"); | |
runTransaction(ctx, forceRetryLogic()); | |
} else { | |
start = System.nanoTime(); | |
runTransaction(ctx, addAccounts()); | |
long fromBalance = runTransaction(ctx, getAccountBalance(fromAccountId)); | |
long toBalance = runTransaction(ctx, getAccountBalance(toAccountId)); | |
end = System.nanoTime(); | |
System.err.println(String.format("insert: %s ms (%s s)", (end-start)/NANO2MS, (end-start)/NANO2S)); | |
if (fromBalance != -1 && toBalance != -1) { | |
// Success! | |
System.out.printf("APP: getAccountBalance(%d) --> %d\n", fromAccountId, fromBalance); | |
System.out.printf("APP: getAccountBalance(%d) --> %d\n", toAccountId, toBalance); | |
} | |
start = System.nanoTime(); | |
for(int i = 1; i<1000; i++) | |
{ | |
// Transfer $100 from account 1 to account 2 | |
long transferResult = runTransaction(ctx, transferFunds(i, i-1, transferAmount)); | |
if (transferResult != -1) { | |
// Success! | |
System.out.printf("APP: transferFunds(%d, %d, %d) --> %d \n", i, i-1, transferAmount, transferResult); | |
long fromBalanceAfter = runTransaction(ctx, getAccountBalance(i)); | |
long toBalanceAfter = runTransaction(ctx, getAccountBalance(i-1)); | |
if (fromBalanceAfter != -1 && toBalanceAfter != -1) { | |
// Success! | |
System.out.printf("APP: getAccountBalance(%d) --> %d\n", i, fromBalanceAfter); | |
System.out.printf("APP: getAccountBalance(%d) --> %d\n", i-1, toBalanceAfter); | |
} | |
} | |
} | |
end = System.nanoTime(); | |
System.err.println(String.format("select+update: %s ms (%s s)", (end-start)/NANO2MS, (end-start)/NANO2S)); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment