Skip to content

Instantly share code, notes, and snippets.

@ben-manes
Last active December 15, 2015 20:09
Show Gist options
  • Save ben-manes/5316351 to your computer and use it in GitHub Desktop.
Save ben-manes/5316351 to your computer and use it in GitHub Desktop.
/**
* A test for the connection life-cycle management within jOOQ managed operations.
*
* @author Ben Manes (ben@addepar.com)
*/
@Guice(modules = {JooqTestModule.class, TestModule.class})
public class ConnectionLifecycleTest {
final AtomicInteger counter = new AtomicInteger();
@Inject CountingExecuteListener countingExecuteListener;
@Inject CountingDataSource countingDataSource;
@Inject TransactionOperations template;
@Inject Executor db;
@BeforeMethod
public void beforeMethod() {
countingDataSource.reset();
countingExecuteListener.reset();
}
@AfterMethod
public void afterMethod() {
db.truncate(USER).execute();
}
@Test
public void perExecutorStatement() {
insertUser("John", "Doe");
insertUser("Jane", "Doe");
assertThat(countingDataSource.getNumberOfEstablishedConnections(), is(2));
assertThat(countingExecuteListener.getStartCount(), is(4));
assertThat(countingExecuteListener.getEndCount(), is(4));
assertThat(isInTransaction(), is(false));
}
@Test(enabled = false)
public void perExecutorStatement_lazy() {
// FIXME(ben): An implicitly lazy operation does not indicate to the execute listeners that
// the execution has completed. This results in connection leak.
db.selectCount().from(USER).fetchOne();
assertThat(countingDataSource.getNumberOfEstablishedConnections(), is(1));
assertThat(countingExecuteListener.getStartCount(), is(1));
assertThat(countingExecuteListener.getEndCount(), is(1));
assertThat(isInTransaction(), is(false));
}
@Test
public void perExecutorStatement_rollback() throws Throwable {
try {
UserRecord record = newUserRecord("John", "Doe");
db.batchInsert(ImmutableList.of(record, record)).execute();
Assert.fail();
} catch (DataAccessException e) {
assertThat(countingDataSource.getNumberOfEstablishedConnections(), is(1));
assertThat(countingExecuteListener.getStartCount(), is(3));
assertThat(countingExecuteListener.getEndCount(), is(3));
assertThat(isInTransaction(), is(false));
assertThat(count(), is(0));
assertThat(isInTransaction(), is(false));
}
}
@Test(threadPoolSize = 2, invocationCount = 50)
public void concurrent() {
insertUser("John", "Doe-" + counter.incrementAndGet());
}
@Test(threadPoolSize = 2, invocationCount = 50)
public void concurrent_rollback() throws Throwable {
try {
insertUser("John", null);
Assert.fail();
} catch (DataAccessException e) {
rethrowIfNotConstraintViolation(e);
}
}
@Test
public void transactional() {
template.execute(new TransactionCallbackWithoutResult() {
@Override protected void doInTransactionWithoutResult(TransactionStatus status) {
insertUser("John", "Doe");
insertUser("Jane", "Doe");
assertThat(isInTransaction(), is(true));
}
});
assertThat(countingDataSource.getNumberOfEstablishedConnections(), is(1));
assertThat(countingExecuteListener.getStartCount(), is(4));
assertThat(countingExecuteListener.getEndCount(), is(4));
assertThat(isInTransaction(), is(false));
}
@Test
public void transactional_rollback() {
try {
template.execute(new TransactionCallbackWithoutResult() {
@Override protected void doInTransactionWithoutResult(TransactionStatus status) {
insertUser("John", "Doe");
insertUser("John", "Doe");
assertThat(isInTransaction(), is(true));
}
});
Assert.fail();
} catch (DataAccessException e) {
assertThat(count(), is(0));
}
}
/**
* Inserts a new user and fetches back the record in a single executor statement. This translates
* into multiple SQL operations where two executor life cycles are triggered, with the insert
* wrapping over the fetch's execution life cycle.
*/
private void insertUser(String firstName, String lastName) {
db.insertInto(USER).set(newUserRecord(firstName, lastName)).returning().fetchOne();
}
private UserRecord newUserRecord(String firstName, String lastName) {
UserRecord record = new UserRecord();
record.setFirstName(firstName);
record.setLastName(lastName);
return record;
}
private int count() {
return db.selectCount().from(USER).fetch().get(0).value1();
}
private boolean isInTransaction() {
return TransactionSynchronizationManager.isSynchronizationActive();
}
private void rethrowIfNotConstraintViolation(DataAccessException e) throws Throwable {
Throwable cause = e.getCause();
boolean expected = (cause instanceof SQLException) && cause.getMessage().contains("NULL");
if (!expected) {
throw cause;
}
}
public static final class TestModule extends AbstractModule {
@Override protected void configure() {
bind(DelegatingDataSource.class).to(CountingDataSource.class);
Multibinder.newSetBinder(binder(), ExecuteListener.class)
.addBinding().to(CountingExecuteListener.class);
}
}
}
/**
* A connection provider for a transactionally aware {@link DataSource}.
*
* @author Ben Manes (ben@addepar.com)
*/
final class TransactionAwareConnectionProvider implements ConnectionProvider {
private final DataSource dataSource;
@Inject
public TransactionAwareConnectionProvider(DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
public Connection acquire() {
try {
return dataSource.getConnection();
} catch (SQLException e) {
throw new DataAccessException("Error getting connection from data source " + dataSource, e);
}
}
@Override
public void release(Connection released) {
checkArgument(DataSourceUtils.isConnectionTransactional(released, dataSource),
"Expected the connection to be managed by the current thread's transaction");
try {
DataSourceUtils.doReleaseConnection(released, dataSource);
} catch (SQLException e) {
throw new DataAccessException("Error closing connection " + released, e);
}
}
}
/**
* An execution listener that helps manages the connection's life-cycle.
* <p>
* jOOQ does not manage a connection's life-cycle (commit, rollback, etc). The default connection
* behavior require stateful executors so that the application logic can manage the connection. If
* the application is not careful then multi-threaded usage may cause a connection leak. As an
* operation may fetch a database connection multiple times during its execution, the same
* underlying connection must be used.
* <p>
* If a transaction is active then Spring will manage the connection in a thread local and return
* the same instance throughout multiple operations. This allows multiple statements and nested
* transactions to be committed or rolled back as an atomic unit. This behavior is leveraged to
* to ensure that jOOQ's {@link Executor} is stateless by ensuring that all executions are
* transactional, even in the absence of an application-defined transaction.
*
* @author Ben Manes (ben@addepar.com)
*/
final class TransactionAwareExecutionListener extends DefaultExecuteListener {
private static final Logger logger = Logger.getLogger(TransactionAwareExecutionListener.class);
private static final ThreadLocal<TransactionStatus> transactionStatus =
new ThreadLocal<TransactionStatus>();
private static final ThreadLocal<Integer> nestedExecutions = new ThreadLocal<Integer>() {
@Override public Integer initialValue() {
return 0;
}
};
private static final long serialVersionUID = 1L;
private final PlatformTransactionManager transactionManager;
private final TransactionDefinition definition;
@Inject
public TransactionAwareExecutionListener(PlatformTransactionManager transactionManager) {
this.definition = new DefaultTransactionDefinition();
this.transactionManager = transactionManager;
}
@Override
public void start(ExecuteContext context) {
incrementNestedExecutions();
if (!isInTransaction()) {
// Begin a transaction so that execution uses a managed connection
TransactionStatus status = transactionManager.getTransaction(definition);
transactionStatus.set(status);
}
}
@Override
public void end(ExecuteContext context) {
decrementNestedExecutions();
if (isNestedExecution() || isApplicationTransaction()) {
return;
}
try {
if (isSuccessful(context)) {
transactionManager.commit(transactionStatus.get());
} else {
rollbackOnException(transactionStatus.get(), context.exception());
}
} finally {
transactionStatus.remove();
}
}
/** Perform a rollback, logging if a rollback exception occurs. */
private void rollbackOnException(TransactionStatus status, Throwable throwable) {
try {
transactionManager.rollback(status);
} catch (RuntimeException e) {
logger.error("Rollback exception supressed by application exception", e);
}
}
/** Increment the number of executions being performed on the connection. */
private void incrementNestedExecutions() {
nestedExecutions.set(nestedExecutions.get() + 1);
}
/** Decrement the number of executions being performed on the connection. */
private void decrementNestedExecutions() {
nestedExecutions.set(nestedExecutions.get() - 1);
}
/** If there are remaining executions being performed on the connection. */
private boolean isNestedExecution() {
return nestedExecutions.get() > 0;
}
/** If the current thread is in a transactional context. */
private boolean isInTransaction() {
return TransactionSynchronizationManager.isSynchronizationActive();
}
/** If the transactional context was created by the application or the listener. */
private boolean isApplicationTransaction() {
return (transactionStatus.get() == null);
}
/** If the operation was executed successfully. */
private boolean isSuccessful(ExecuteContext context) {
return (context.exception() == null);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment