Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save AlejandroRivera/381cd2121462bd3d90f1e5253899c876 to your computer and use it in GitHub Desktop.
Save AlejandroRivera/381cd2121462bd3d90f1e5253899c876 to your computer and use it in GitHub Desktop.
MongoDB Zipkin/Brave integration
package com.example.tracing.db.mongo;
import com.example.tracing.MongoDbConstants;
import com.github.kristofa.brave.Brave;
import com.github.kristofa.brave.LocalTracer;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.mongodb.MongoQueryEventSubscriber;
import com.mongodb.ReadPreference;
import com.mongodb.operation.ReadOperation;
import com.mongodb.operation.WriteOperation;
import org.apache.commons.lang3.time.StopWatch;
import java.util.concurrent.TimeUnit;
@Singleton
public class DistributedTracingMongoSubscriber implements MongoQueryEventSubscriber {
private Brave tracer;
@Inject
public DistributedTracingMongoSubscriber(Brave tracer) {
this.tracer = tracer;
}
@Override
public <T> void afterReadError(ReadOperation<T> operation, ReadPreference readPreference,
RuntimeException ex, StopWatch stopWatch) {
recordReadOperation(operation, readPreference, stopWatch);
}
@Override
public <T> void afterReadSuccess(ReadOperation<T> operation, ReadPreference readPreference, T result, StopWatch stopWatch) {
recordReadOperation(operation, readPreference, stopWatch);
}
private <T> void recordReadOperation(ReadOperation<T> operation, ReadPreference readPreference, StopWatch stopWatch) {
if (!isTracingEnabled()) {
return;
}
LocalTracer localTracer = tracer.localTracer();
long startTimeInMicros = TimeUnit.MILLISECONDS.toMicros(stopWatch.getStartTime());
localTracer.startNewSpan(MongoDbConstants.COMPONENT_NAME, MongoDbConstants.READ_OPERATION_NAME, startTimeInMicros);
localTracer.submitBinaryAnnotation(MongoDbConstants.QUERY_OPERATION_ANNOTATION, operation.getClass().getSimpleName());
localTracer.submitBinaryAnnotation(MongoDbConstants.QUERY_READ_PREFERENCE_ANNOTATION, readPreference.getName());
TracingInfo info = TracingInfo.fromReadOperation(operation);
localTracer.submitBinaryAnnotation(MongoDbConstants.QUERY_DB_ANNOTATION, info.getDb());
localTracer.submitBinaryAnnotation(MongoDbConstants.QUERY_COLLECTION_ANNOTATION, info.getCollection());
localTracer.submitBinaryAnnotation(MongoDbConstants.QUERY_FILTER_ANNOTATION, info.getFilter());
localTracer.finishSpan(TimeUnit.NANOSECONDS.toMicros(stopWatch.getNanoTime()));
}
@Override
public <T> void afterWriteError(WriteOperation<T> operation, RuntimeException ex, StopWatch stopWatch) {
recordWriteOperation(operation, stopWatch);
}
@Override
public <T> void afterWriteSuccess(WriteOperation<T> operation, T result, StopWatch stopWatch) {
recordWriteOperation(operation, stopWatch);
}
private <T> void recordWriteOperation(WriteOperation<T> operation, StopWatch stopWatch) {
if (!isTracingEnabled()) {
return;
}
LocalTracer localTracer = tracer.localTracer();
long startTimeInMicros = TimeUnit.MILLISECONDS.toMicros(stopWatch.getStartTime());
localTracer.startNewSpan(MongoDbConstants.COMPONENT_NAME, MongoDbConstants.WRITE_OPERATION_NAME, startTimeInMicros);
localTracer.submitBinaryAnnotation(MongoDbConstants.QUERY_OPERATION_ANNOTATION, operation.getClass().getSimpleName());
TracingInfo info = TracingInfo.fromWriteOperation(operation);
localTracer.submitBinaryAnnotation(MongoDbConstants.QUERY_DB_ANNOTATION, info.getDb());
localTracer.submitBinaryAnnotation(MongoDbConstants.QUERY_COLLECTION_ANNOTATION, info.getCollection());
localTracer.submitBinaryAnnotation(MongoDbConstants.QUERY_WRITE_CONCERN_ANNOTATION, info.getWriteConcern());
localTracer.submitBinaryAnnotation(MongoDbConstants.QUERY_FILTER_ANNOTATION, info.getFilter());
localTracer.finishSpan(TimeUnit.NANOSECONDS.toMicros(stopWatch.getNanoTime()));
}
private boolean isTracingEnabled() {
return Boolean.TRUE.equals(tracer.serverSpanThreadBinder().getCurrentServerSpan().getSample());
}
}
interface MongoDbConstants {
String COMPONENT_NAME = "MongoDB";
String READ_OPERATION_NAME = "db_read";
String WRITE_OPERATION_NAME = "db_write";
String QUERY_DB_ANNOTATION = "query.db";
String QUERY_COLLECTION_ANNOTATION = "query.collection";
String QUERY_OPERATION_ANNOTATION = "query.operation";
String QUERY_FILTER_ANNOTATION = "query.filter";
String QUERY_READ_PREFERENCE_ANNOTATION = "query.read_preference";
String QUERY_WRITE_CONCERN_ANNOTATION = "query.write_concern";
}
package com.mongodb;
import com.mongodb.operation.ReadOperation;
import com.mongodb.operation.WriteOperation;
import org.apache.commons.lang3.time.StopWatch;
public interface MongoQueryEventSubscriber {
default <T> void beforeExecuteRead(ReadOperation<T> operation, ReadPreference readPreference){
}
default <T> void afterReadSuccess(ReadOperation<T> operation, ReadPreference readPreference,
T result, StopWatch stopWatch){
}
default <T> void afterReadError(ReadOperation<T> operation, ReadPreference readPreference,
RuntimeException ex, StopWatch stopWatch) {
}
default <T> void beforeExecuteWrite(WriteOperation<T> operation) {
}
default <T> void afterWriteSuccess(WriteOperation<T> operation, T result, StopWatch stopWatch) {
}
default <T> void afterWriteError(WriteOperation<T> operation, RuntimeException ex, StopWatch stopWatch) {
}
}
package com.mongodb;
import com.mongodb.operation.OperationExecutor;
import com.mongodb.operation.ReadOperation;
import com.mongodb.operation.WriteOperation;
import org.apache.commons.lang3.time.StopWatch;
import java.util.List;
import java.util.Set;
import javax.inject.Provider;
/**
* A wrapper that allows to register subscribers to query events.
*
* <p>This class needed to be in the package {@code com.mongodb} because it needed to expose methods that were package-private,
* such as {@link #createOperationExecutor()}
*
* @see ObservableOperationExecutor
* @see MongoQueryEventSubscriber
*/
public class ObservableMongoClientWrapper extends MongoClient {
private final Provider<Set<MongoQueryEventSubscriber>> operationSubscribers;
public ObservableMongoClientWrapper(ServerAddress serverAddress,
MongoClientOptions mongoClientOptions,
Provider<Set<MongoQueryEventSubscriber>> operationSubscribers) {
super(serverAddress, mongoClientOptions);
this.operationSubscribers = operationSubscribers;
}
public <T> ObservableMongoClientWrapper(ServerAddress serverAddress,
List<MongoCredential> ts,
MongoClientOptions mongoClientOptions,
Provider<Set<MongoQueryEventSubscriber>> operationSubscribers) {
super(serverAddress, ts, mongoClientOptions);
this.operationSubscribers = operationSubscribers;
}
public ObservableMongoClientWrapper(List<ServerAddress> servers,
MongoClientOptions mongoClientOptions,
Provider<Set<MongoQueryEventSubscriber>> operationSubscribers) {
super(servers, mongoClientOptions);
this.operationSubscribers = operationSubscribers;
}
public ObservableMongoClientWrapper(List<ServerAddress> servers,
List<MongoCredential> ts,
MongoClientOptions mongoClientOptions,
Provider<Set<MongoQueryEventSubscriber>> operationSubscribers) {
super(servers, ts, mongoClientOptions);
this.operationSubscribers = operationSubscribers;
}
/**
* Creates an OperationExecutor that will notify subscribers of events that happen in the original one.
*/
@Override
public OperationExecutor createOperationExecutor() {
OperationExecutor originalExecutor = super.createOperationExecutor();
return new ObservableOperationExecutor(originalExecutor, operationSubscribers.get());
}
/**
* This method is overriden to make it public since without it testing (mock/spy) won't work.
*/
@Override
public <T> T execute(ReadOperation<T> operation, ReadPreference readPreference) {
return super.execute(operation, readPreference);
}
/**
* This method is overriden to make it public since without it testing (mock/spy) won't work.
*/
@Override
public <T> T execute(WriteOperation<T> operation) {
return super.execute(operation);
}
private static class ObservableOperationExecutor implements OperationExecutor {
private final OperationExecutor executor;
private final Set<MongoQueryEventSubscriber> subscribers;
ObservableOperationExecutor(OperationExecutor executor,
Set<MongoQueryEventSubscriber> subscribers) {
this.executor = executor;
this.subscribers = subscribers;
}
@Override
public <T> T execute(ReadOperation<T> operation, ReadPreference readPreference) {
StopWatch stopWatch = new StopWatch();
try {
subscribers.forEach(subscriber -> subscriber.beforeExecuteRead(operation, readPreference));
T result;
result = timedReadExecute(operation, readPreference, stopWatch);
subscribers.forEach(subscriber -> subscriber.afterReadSuccess(operation, readPreference, result, stopWatch));
return result;
} catch (RuntimeException e) {
subscribers.forEach(subscriber -> subscriber.afterReadError(operation, readPreference, e, stopWatch));
throw e;
}
}
@Override
public <T> T execute(WriteOperation<T> operation) {
StopWatch stopWatch = new StopWatch();
try {
subscribers.forEach(subscriber -> subscriber.beforeExecuteWrite(operation));
T result = timedWriteExecute(operation, stopWatch);
subscribers.forEach(subscriber -> subscriber.afterWriteSuccess(operation, result, stopWatch));
return result;
} catch (RuntimeException e) {
subscribers.forEach(subscriber -> subscriber.afterWriteError(operation, e, stopWatch));
throw e;
}
}
private <T> T timedReadExecute(ReadOperation<T> operation, ReadPreference readPreference, StopWatch stopWatch) {
T result;
try {
stopWatch.start();
result = executor.execute(operation, readPreference);
} finally {
stopWatch.stop();
}
return result;
}
private <T> T timedWriteExecute(WriteOperation<T> operation, StopWatch stopWatch) {
T result;
try {
stopWatch.start();
result = executor.execute(operation);
} finally {
stopWatch.stop();
}
return result;
}
}
}
package com.example.tracing.db.mongo;
import com.mongodb.operation.BaseWriteOperation;
import com.mongodb.operation.CountOperation;
import com.mongodb.operation.DeleteOperation;
import com.mongodb.operation.DistinctOperation;
import com.mongodb.operation.FindAndDeleteOperation;
import com.mongodb.operation.FindAndReplaceOperation;
import com.mongodb.operation.FindAndUpdateOperation;
import com.mongodb.operation.FindOperation;
import com.mongodb.operation.GroupOperation;
import com.mongodb.operation.MapReduceWithInlineResultsOperation;
import com.mongodb.operation.ReadOperation;
import com.mongodb.operation.WriteOperation;
class TracingInfo {
private static final String EMPTY = "N/A";
private String filter;
private String collection;
private String db;
private String writeConcern;
private TracingInfo(String filter, String collection, String db) {
this.filter = filter;
this.collection = collection;
this.db = db;
}
private TracingInfo(String filter, String collection, String databaseName, String writeConcern) {
this(filter, collection, databaseName);
this.writeConcern = writeConcern;
}
public String getFilter() {
return filter;
}
public String getCollection() {
return collection;
}
public String getDb() {
return db;
}
public String getWriteConcern() {
return writeConcern;
}
static TracingInfo fromReadOperation(ReadOperation operation) {
String filter = EMPTY;
String collection = EMPTY;
String db = EMPTY;
if (operation instanceof FindOperation) {
filter = ((FindOperation) operation).getFilter().toString();
collection = ((FindOperation) operation).getNamespace().getCollectionName();
db = ((FindOperation) operation).getNamespace().getDatabaseName();
} else if (operation instanceof CountOperation) {
filter = ((CountOperation) operation).getFilter().toString();
} else if (operation instanceof DistinctOperation<?>) {
filter = ((DistinctOperation) operation).getFilter().toString();
} else if (operation instanceof GroupOperation<?>) {
filter = ((GroupOperation) operation).getFilter().toString();
} else if (operation instanceof MapReduceWithInlineResultsOperation<?>) {
filter = ((MapReduceWithInlineResultsOperation) operation).getFilter().toString();
}
return new TracingInfo(filter, collection, db);
}
static TracingInfo fromWriteOperation(WriteOperation operation) {
String collection = EMPTY;
String writeConcern = EMPTY;
String filter = EMPTY;
String databaseName = EMPTY;
if (operation instanceof BaseWriteOperation) {
collection = ((BaseWriteOperation) operation).getNamespace().getCollectionName();
writeConcern = ((BaseWriteOperation) operation).getWriteConcern().toString();
databaseName = ((BaseWriteOperation) operation).getNamespace().getDatabaseName();
}
if (operation instanceof DeleteOperation) {
filter = ((DeleteOperation) operation).getDeleteRequests().iterator().next().getFilter().toString();
} else if (operation instanceof FindAndDeleteOperation) {
filter = ((FindAndDeleteOperation) operation).getFilter().toString();
collection = ((FindAndDeleteOperation) operation).getNamespace().getCollectionName();
} else if (operation instanceof FindAndUpdateOperation) {
filter = ((FindAndUpdateOperation) operation).getFilter().toString();
collection = ((FindAndUpdateOperation) operation).getNamespace().getCollectionName();
} else if (operation instanceof FindAndReplaceOperation) {
filter = ((FindAndReplaceOperation) operation).getFilter().toString();
collection = ((FindAndReplaceOperation) operation).getNamespace().getCollectionName();
}
return new TracingInfo(filter, collection, databaseName, writeConcern);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment