Skip to content

Instantly share code, notes, and snippets.

@62mkv
Last active April 26, 2023 15:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save 62mkv/779ff24367248519b467b25f60202067 to your computer and use it in GitHub Desktop.
Save 62mkv/779ff24367248519b467b25f60202067 to your computer and use it in GitHub Desktop.
Example of TracingExecutionListener, adapted from https://github.com/ttddyy/r2dbc-proxy-examples/blob/master/listener-example/src/main/java/io/r2dbc/examples/TracingExecutionListener.java to use OpenTelemetry instead of Zipkin/Brave
package org.example;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.api.trace.Span;
import io.r2dbc.proxy.core.*;
import io.r2dbc.proxy.listener.ProxyMethodExecutionListener;
import static java.util.stream.Collectors.joining;
/**
* Listener to create spans for R2DBC SPI operations.
*
* @author Tadaya Tsuyukubo
*/
public class TracingExecutionListener implements ProxyMethodExecutionListener {
private static final String TAG_CONNECTION_ID = "connectionId";
private static final String TAG_CONNECTION_CREATE_THREAD_ID = "threadIdOnCreate";
private static final String TAG_CONNECTION_CLOSE_THREAD_ID = "threadIdOnClose";
private static final String TAG_CONNECTION_CREATE_THREAD_NAME = "threadNameOnCreate";
private static final String TAG_CONNECTION_CLOSE_THREAD_NAME = "threadNameOnClose";
private static final String TAG_THREAD_ID = "threadId";
private static final String TAG_THREAD_NAME = "threadName";
private static final String TAG_QUERIES = "queries";
private static final String TAG_BATCH_SIZE = "batchSize";
private static final String TAG_QUERY_TYPE = "type";
private static final String TAG_QUERY_SUCCESS = "success";
private static final String TAG_QUERY_MAPPED_RESULT_COUNT = "mappedResultCount";
private static final String TAG_TRANSACTION_SAVEPOINT = "savepoint";
private static final String TAG_TRANSACTION_COUNT = "transactionCount";
private static final String TAG_COMMIT_COUNT = "commitCount";
private static final String TAG_ROLLBACK_COUNT = "rollbackCount";
static final String CONNECTION_SPAN_KEY = "connectionSpan";
static final String TRANSACTION_SPAN_KEY = "transactionSpan";
static final String QUERY_SPAN_KEY = "querySpan";
private final Tracer tracer;
public TracingExecutionListener(Tracer tracer) {
this.tracer = tracer;
}
@Override
public void beforeCreateOnConnectionFactory(MethodExecutionInfo methodExecutionInfo) {
Span connectionSpan = this.tracer.spanBuilder("r2dbc:connection")
.setSpanKind(SpanKind.CLIENT)
.startSpan();
// store the span for retrieval at "afterCreateOnConnectionFactory"
methodExecutionInfo.getValueStore().put("initialConnectionSpan", connectionSpan);
}
@Override
public void afterCreateOnConnectionFactory(MethodExecutionInfo methodExecutionInfo) {
// retrieve the span created at "beforeCreateOnConnectionFactory"
Span connectionSpan = methodExecutionInfo.getValueStore().get("initialConnectionSpan", Span.class);
Throwable thrown = methodExecutionInfo.getThrown();
if (thrown != null) {
connectionSpan
.recordException(thrown)
.end();
return;
}
ConnectionInfo connectionInfo = methodExecutionInfo.getConnectionInfo();
String connectionId = connectionInfo.getConnectionId();
connectionSpan
.setAttribute(TAG_CONNECTION_ID, connectionId)
.setAttribute(TAG_CONNECTION_CREATE_THREAD_ID, String.valueOf(methodExecutionInfo.getThreadId()))
.setAttribute(TAG_CONNECTION_CREATE_THREAD_NAME, methodExecutionInfo.getThreadName())
.updateName("Connection created");
// store the span in connection scoped value store
connectionInfo.getValueStore().put(CONNECTION_SPAN_KEY, connectionSpan);
}
@Override
public void afterCloseOnConnection(MethodExecutionInfo methodExecutionInfo) {
ConnectionInfo connectionInfo = methodExecutionInfo.getConnectionInfo();
String connectionId = connectionInfo.getConnectionId();
Span connectionSpan = connectionInfo.getValueStore().get(CONNECTION_SPAN_KEY, Span.class);
if (connectionSpan == null) {
return; // already closed
}
Throwable thrown = methodExecutionInfo.getThrown();
if (thrown != null) {
connectionSpan.recordException(thrown);
}
connectionSpan
.setAttribute(TAG_CONNECTION_ID, connectionId)
.setAttribute(TAG_CONNECTION_CLOSE_THREAD_ID, String.valueOf(methodExecutionInfo.getThreadId()))
.setAttribute(TAG_CONNECTION_CLOSE_THREAD_NAME, methodExecutionInfo.getThreadName())
.setAttribute(TAG_TRANSACTION_COUNT, String.valueOf(connectionInfo.getTransactionCount()))
.setAttribute(TAG_COMMIT_COUNT, String.valueOf(connectionInfo.getCommitCount()))
.setAttribute(TAG_ROLLBACK_COUNT, String.valueOf(connectionInfo.getRollbackCount()))
.end();
}
@Override
public void beforeQuery(QueryExecutionInfo queryExecutionInfo) {
String connectionId = queryExecutionInfo.getConnectionInfo().getConnectionId();
String queries = queryExecutionInfo.getQueries().stream()
.map(QueryInfo::getQuery)
.collect(joining(", "));
Span querySpan = this.tracer
.spanBuilder("r2dbc:query")
.setSpanKind(SpanKind.CLIENT)
.setAttribute(TAG_CONNECTION_ID, connectionId)
.setAttribute(TAG_QUERY_TYPE, queryExecutionInfo.getType().toString())
.setAttribute(TAG_QUERIES, queries)
.startSpan();
if (ExecutionType.BATCH == queryExecutionInfo.getType()) {
querySpan.setAttribute(TAG_BATCH_SIZE, Integer.toString(queryExecutionInfo.getBatchSize()));
}
// pass the query span to "afterQuery" method
queryExecutionInfo.getValueStore().put(QUERY_SPAN_KEY, querySpan);
}
@Override
public void afterQuery(QueryExecutionInfo queryExecutionInfo) {
Span querySpan = queryExecutionInfo.getValueStore().get(QUERY_SPAN_KEY, Span.class);
querySpan
.setAttribute(TAG_THREAD_ID, String.valueOf(queryExecutionInfo.getThreadId()))
.setAttribute(TAG_THREAD_NAME, queryExecutionInfo.getThreadName())
.setAttribute(TAG_QUERY_SUCCESS, Boolean.toString(queryExecutionInfo.isSuccess()));
Throwable thrown = queryExecutionInfo.getThrowable();
if (thrown != null) {
querySpan.recordException(thrown);
} else {
querySpan.setAttribute(TAG_QUERY_MAPPED_RESULT_COUNT, Integer.toString(queryExecutionInfo.getCurrentResultCount()));
}
querySpan.end();
}
@Override
public void beforeBeginTransactionOnConnection(MethodExecutionInfo methodExecutionInfo) {
Span transactionSpan = this.tracer.spanBuilder("r2dbc:transaction")
.setSpanKind(SpanKind.CLIENT)
.startSpan();
methodExecutionInfo.getConnectionInfo().getValueStore().put(TRANSACTION_SPAN_KEY, transactionSpan);
}
@Override
public void afterCommitTransactionOnConnection(MethodExecutionInfo methodExecutionInfo) {
ConnectionInfo connectionInfo = methodExecutionInfo.getConnectionInfo();
String connectionId = connectionInfo.getConnectionId();
Span transactionSpan = connectionInfo.getValueStore().get(TRANSACTION_SPAN_KEY, Span.class);
if (transactionSpan != null) {
transactionSpan
.updateName("Commit")
.setAttribute(TAG_CONNECTION_ID, connectionId)
.setAttribute(TAG_THREAD_ID, String.valueOf(methodExecutionInfo.getThreadId()))
.setAttribute(TAG_THREAD_NAME, methodExecutionInfo.getThreadName())
.end();
}
Span connectionSpan = connectionInfo.getValueStore().get(CONNECTION_SPAN_KEY, Span.class);
if (connectionSpan == null) {
return;
}
connectionSpan.updateName("Transaction commit");
}
@Override
public void afterRollbackTransactionOnConnection(MethodExecutionInfo methodExecutionInfo) {
ConnectionInfo connectionInfo = methodExecutionInfo.getConnectionInfo();
String connectionId = connectionInfo.getConnectionId();
Span transactionSpan = connectionInfo.getValueStore().get(TRANSACTION_SPAN_KEY, Span.class);
if (transactionSpan != null) {
transactionSpan
.updateName("Rollback")
.setAttribute(TAG_CONNECTION_ID, connectionId)
.setAttribute(TAG_THREAD_ID, String.valueOf(methodExecutionInfo.getThreadId()))
.setAttribute(TAG_THREAD_NAME, methodExecutionInfo.getThreadName())
.end();
}
Span connectionSpan = connectionInfo.getValueStore().get(CONNECTION_SPAN_KEY, Span.class);
connectionSpan.updateName("Transaction rollback");
}
@Override
public void afterRollbackTransactionToSavepointOnConnection(MethodExecutionInfo methodExecutionInfo) {
ConnectionInfo connectionInfo = methodExecutionInfo.getConnectionInfo();
String connectionId = connectionInfo.getConnectionId();
String savepoint = (String) methodExecutionInfo.getMethodArgs()[0];
Span transactionSpan = connectionInfo.getValueStore().get(TRANSACTION_SPAN_KEY, Span.class);
if (transactionSpan != null) {
transactionSpan
.updateName("Rollback to savepoint")
.setAttribute(TAG_TRANSACTION_SAVEPOINT, savepoint)
.setAttribute(TAG_CONNECTION_ID, connectionId)
.setAttribute(TAG_THREAD_ID, String.valueOf(methodExecutionInfo.getThreadId()))
.setAttribute(TAG_THREAD_NAME, methodExecutionInfo.getThreadName())
.end();
}
Span connectionSpan = connectionInfo.getValueStore().get(CONNECTION_SPAN_KEY, Span.class);
connectionSpan.updateName("Transaction rollback to savepoint");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment