Skip to content

Instantly share code, notes, and snippets.

@osi
Created October 2, 2020 15:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save osi/99d1263e67021f898a89d66417aa3fa5 to your computer and use it in GitHub Desktop.
Save osi/99d1263e67021f898a89d66417aa3fa5 to your computer and use it in GitHub Desktop.
a R2DBC Connection Factory that gives connections a unique name and logs their creation and closure
import io.r2dbc.spi.Batch;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryMetadata;
import io.r2dbc.spi.ConnectionMetadata;
import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.Statement;
import io.r2dbc.spi.ValidationDepth;
import io.r2dbc.spi.Wrapped;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Scannable;
import reactor.core.publisher.Mono;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
public class LoggingConnectionFactory implements ConnectionFactory {
private static final Logger logger = LoggerFactory.getLogger(LoggingConnectionFactory.class);
private final ConnectionFactory delegate;
private final Mono<? extends Connection> create;
private final AtomicInteger counter = new AtomicInteger();
public LoggingConnectionFactory(String name, ConnectionFactory delegate) {
this.delegate = delegate;
this.create = Mono.from(delegate.create())
.map(c -> new CloseLoggingConnection(name + " " + counter.incrementAndGet(), c))
.doOnNext(c -> logger.debug("created {}", c));
}
@Override
public Publisher<? extends Connection> create() {
return create;
}
@Override
public ConnectionFactoryMetadata getMetadata() {
return delegate.getMetadata();
}
private static class CloseLoggingConnection implements Connection, Wrapped<Connection> {
private final String name;
private final Connection delegate;
private final AtomicReference<Throwable> firstClose = new AtomicReference<>();
public CloseLoggingConnection(String name, Connection delegate) {
this.name = name;
this.delegate = delegate;
}
@Override
public Connection unwrap() {
return delegate;
}
@Override
public Publisher<Void> beginTransaction() {
return delegate.beginTransaction();
}
@Override
public Publisher<Void> close() {
return Mono.from(delegate.close())
.doOnSubscribe(s -> {
var name = Scannable.from(s).name();
var throwable = new Throwable("close initiated from " + name).fillInStackTrace();
if (firstClose.compareAndSet(null, throwable)) {
// first close
} else {
logger.error("{} closed more than once (first close)", this, firstClose.get());
logger.error("{} closed more than once (current close)", this, throwable);
}
})
.doFinally(signal -> logger.debug("closed {} ({})", this, signal));
}
@Override
public Publisher<Void> commitTransaction() {
return delegate.commitTransaction();
}
@Override
public Batch createBatch() {
return delegate.createBatch();
}
@Override
public Publisher<Void> createSavepoint(String name) {
return delegate.createSavepoint(name);
}
@Override
public Statement createStatement(String sql) {
return delegate.createStatement(sql);
}
@Override
public boolean isAutoCommit() {
return delegate.isAutoCommit();
}
@Override
public ConnectionMetadata getMetadata() {
return delegate.getMetadata();
}
@Override
public IsolationLevel getTransactionIsolationLevel() {
return delegate.getTransactionIsolationLevel();
}
@Override
public Publisher<Void> releaseSavepoint(String name) {
return delegate.releaseSavepoint(name);
}
@Override
public Publisher<Void> rollbackTransaction() {
return delegate.rollbackTransaction();
}
@Override
public Publisher<Void> rollbackTransactionToSavepoint(String name) {
return delegate.rollbackTransactionToSavepoint(name);
}
@Override
public Publisher<Void> setAutoCommit(boolean autoCommit) {
return delegate.setAutoCommit(autoCommit);
}
@Override
public Publisher<Void> setTransactionIsolationLevel(IsolationLevel isolationLevel) {
return delegate.setTransactionIsolationLevel(isolationLevel);
}
@Override
public Publisher<Boolean> validate(ValidationDepth depth) {
return delegate.validate(depth);
}
@Override
public String toString() {
return "CloseLoggingConnection{" +
"name='" + name + '\'' +
", delegate=" + delegate +
'}';
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment