Skip to content

Instantly share code, notes, and snippets.

@paulbarbu
Created March 24, 2021 09:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save paulbarbu/50fc26d58a0d4b0f69deabd4fa0492ec to your computer and use it in GitHub Desktop.
Save paulbarbu/50fc26d58a0d4b0f69deabd4fa0492ec to your computer and use it in GitHub Desktop.
JOOQ CcockroachDB test sample
package com.cockroachlabs;
import com.cockroachlabs.example.jooq.db.Tables;
import com.cockroachlabs.example.jooq.db.tables.records.AccountsRecord;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.Source;
import org.jooq.conf.RenderQuotedNames;
import org.jooq.conf.Settings;
import org.jooq.exception.DataAccessException;
import org.jooq.impl.DSL;
import org.jooq.TableRecord;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import static com.cockroachlabs.example.jooq.db.Tables.ACCOUNTS;
public class Sample {
private static final Random RAND = new Random();
private static final boolean FORCE_RETRY = false;
private static final String RETRY_SQL_STATE = "40001";
private static final int MAX_ATTEMPT_COUNT = 6;
private static Function<DSLContext, Long> addAccounts() {
return ctx -> {
long rv = 0;
ctx.delete(ACCOUNTS).execute();
List<AccountsRecord> r = new ArrayList<>();
for(int i =0; i< 1000; i++)
{
r.add(new AccountsRecord(UUID.randomUUID(), (long)i, 1000L));
}
ctx.batchInsert(r).execute();
rv = 1;
System.out.printf("APP: addAccounts() --> %d\n", rv);
return rv;
};
}
private static Function<DSLContext, Long> transferFunds(long fromId, long toId, long amount) {
return ctx -> {
long rv = 0;
AccountsRecord fromAccount = ctx.fetchSingle(ACCOUNTS, ACCOUNTS.ID.eq(fromId));
AccountsRecord toAccount = ctx.fetchSingle(ACCOUNTS, ACCOUNTS.ID.eq(toId));
if (!(amount > fromAccount.getBalance())) {
fromAccount.setBalance(fromAccount.getBalance() - amount);
toAccount.setBalance(toAccount.getBalance() + amount);
ctx.batchUpdate(fromAccount, toAccount).execute();
rv = amount;
System.out.printf("APP: transferFunds(%d, %d, %d) --> %d\n", fromId, toId, amount, rv);
}
return rv;
};
}
// Test our retry handling logic if FORCE_RETRY is true. This
// method is only used to test the retry logic. It is not
// intended for production code.
private static Function<DSLContext, Long> forceRetryLogic() {
return ctx -> {
long rv = -1;
try {
System.out.printf("APP: testRetryLogic: BEFORE EXCEPTION\n");
ctx.execute("SELECT crdb_internal.force_retry('1s')");
} catch (DataAccessException e) {
System.out.printf("APP: testRetryLogic: AFTER EXCEPTION\n");
throw e;
}
return rv;
};
}
private static Function<DSLContext, Long> getAccountBalance(long id) {
return ctx -> {
AccountsRecord account = ctx.fetchSingle(ACCOUNTS, ACCOUNTS.ID.eq(id));
long balance = account.getBalance();
System.out.printf("APP: getAccountBalance(%d) --> %d\n", id, balance);
return balance;
};
}
// Run SQL code in a way that automatically handles the
// transaction retry logic so we don't have to duplicate it in
// various places.
private static long runTransaction(DSLContext session, Function<DSLContext, Long> fn) {
AtomicLong rv = new AtomicLong(0L);
AtomicInteger attemptCount = new AtomicInteger(0);
while (attemptCount.get() < MAX_ATTEMPT_COUNT) {
attemptCount.incrementAndGet();
if (attemptCount.get() > 1) {
System.out.printf("APP: Entering retry loop again, iteration %d\n", attemptCount.get());
}
if (session.connectionResult(connection -> {
connection.setAutoCommit(false);
System.out.printf("APP: BEGIN;\n");
if (attemptCount.get() == MAX_ATTEMPT_COUNT) {
String err = String.format("hit max of %s attempts, aborting", MAX_ATTEMPT_COUNT);
throw new RuntimeException(err);
}
// This block is only used to test the retry logic.
// It is not necessary in production code. See also
// the method 'testRetryLogic()'.
if (FORCE_RETRY) {
session.fetch("SELECT now()");
}
try {
rv.set(fn.apply(session));
if (rv.get() != -1) {
connection.commit();
System.out.printf("APP: COMMIT;\n");
return true;
}
} catch (DataAccessException | SQLException e) {
String sqlState = e instanceof SQLException ? ((SQLException) e).getSQLState() : ((DataAccessException) e).sqlState();
if (RETRY_SQL_STATE.equals(sqlState)) {
// Since this is a transaction retry error, we
// roll back the transaction and sleep a little
// before trying again. Each time through the
// loop we sleep for a little longer than the last
// time (A.K.A. exponential backoff).
System.out.printf("APP: retryable exception occurred:\n sql state = [%s]\n message = [%s]\n retry counter = %s\n", sqlState, e.getMessage(), attemptCount.get());
System.out.printf("APP: ROLLBACK;\n");
connection.rollback();
int sleepMillis = (int)(Math.pow(2, attemptCount.get()) * 100) + RAND.nextInt(100);
System.out.printf("APP: Hit 40001 transaction retry error, sleeping %s milliseconds\n", sleepMillis);
try {
Thread.sleep(sleepMillis);
} catch (InterruptedException ignored) {
// no-op
}
rv.set(-1L);
} else {
throw e;
}
}
return false;
})) {
break;
}
}
return rv.get();
}
private final static float NANO2MS = 1000000;
private final static float NANO2S = NANO2MS*1000;
// CREATE USER foo WITH PASSWORD '***';
// CREATE DATABASE bank;
// time cockroach sql --echo-sql --certs-dir=../distributed/certs --host=xyz -d bank < src/main/resources/db.sql
// GRANT ALL ON DATABASE bank TO foo;
// time mvn exec:java -Dexec.mainClass=com.cockroachlabs.Sample 2> times.txt
public static void main(String[] args) throws Exception {
long start = System.nanoTime();
try (Connection connection = DriverManager.getConnection(
"jdbc:postgresql://xyz:26257/bank?sslmode=verify-full&sslrootcert=path/certs/ca.crt&sslcert=path/certs/client.foo.crt&sslkey=path/certs/client.foo.key.pk8&ApplicationName=jooq-sample-uuid-times",
"foo",
"***"
)) {
DSLContext ctx = DSL.using(connection, SQLDialect.COCKROACHDB, new Settings()
.withExecuteLogging(true)
.withRenderQuotedNames(RenderQuotedNames.NEVER));
long end = System.nanoTime();
System.err.println(String.format("init: %s ms (%s s)", (end-start)/NANO2MS, (end-start)/NANO2S));
start = System.nanoTime();
ctx.deleteFrom(ACCOUNTS).execute();
end = System.nanoTime();
System.err.println(String.format("delete: %s ms (%s s)", (end-start)/NANO2MS, (end-start)/NANO2S));
long fromAccountId = 1;
long toAccountId = 2;
long transferAmount = 1;
if (FORCE_RETRY) {
System.out.printf("APP: About to test retry logic in 'runTransaction'\n");
runTransaction(ctx, forceRetryLogic());
} else {
start = System.nanoTime();
runTransaction(ctx, addAccounts());
long fromBalance = runTransaction(ctx, getAccountBalance(fromAccountId));
long toBalance = runTransaction(ctx, getAccountBalance(toAccountId));
end = System.nanoTime();
System.err.println(String.format("insert: %s ms (%s s)", (end-start)/NANO2MS, (end-start)/NANO2S));
if (fromBalance != -1 && toBalance != -1) {
// Success!
System.out.printf("APP: getAccountBalance(%d) --> %d\n", fromAccountId, fromBalance);
System.out.printf("APP: getAccountBalance(%d) --> %d\n", toAccountId, toBalance);
}
start = System.nanoTime();
for(int i = 1; i<1000; i++)
{
// Transfer $100 from account 1 to account 2
long transferResult = runTransaction(ctx, transferFunds(i, i-1, transferAmount));
if (transferResult != -1) {
// Success!
System.out.printf("APP: transferFunds(%d, %d, %d) --> %d \n", i, i-1, transferAmount, transferResult);
long fromBalanceAfter = runTransaction(ctx, getAccountBalance(i));
long toBalanceAfter = runTransaction(ctx, getAccountBalance(i-1));
if (fromBalanceAfter != -1 && toBalanceAfter != -1) {
// Success!
System.out.printf("APP: getAccountBalance(%d) --> %d\n", i, fromBalanceAfter);
System.out.printf("APP: getAccountBalance(%d) --> %d\n", i-1, toBalanceAfter);
}
}
}
end = System.nanoTime();
System.err.println(String.format("select+update: %s ms (%s s)", (end-start)/NANO2MS, (end-start)/NANO2S));
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment