Created
November 18, 2016 15:53
-
-
Save AlejandroRivera/381cd2121462bd3d90f1e5253899c876 to your computer and use it in GitHub Desktop.
MongoDB Zipkin/Brave integration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.tracing.db.mongo; | |
import com.example.tracing.MongoDbConstants; | |
import com.github.kristofa.brave.Brave; | |
import com.github.kristofa.brave.LocalTracer; | |
import com.google.inject.Inject; | |
import com.google.inject.Singleton; | |
import com.mongodb.MongoQueryEventSubscriber; | |
import com.mongodb.ReadPreference; | |
import com.mongodb.operation.ReadOperation; | |
import com.mongodb.operation.WriteOperation; | |
import org.apache.commons.lang3.time.StopWatch; | |
import java.util.concurrent.TimeUnit; | |
@Singleton | |
public class DistributedTracingMongoSubscriber implements MongoQueryEventSubscriber { | |
private Brave tracer; | |
@Inject | |
public DistributedTracingMongoSubscriber(Brave tracer) { | |
this.tracer = tracer; | |
} | |
@Override | |
public <T> void afterReadError(ReadOperation<T> operation, ReadPreference readPreference, | |
RuntimeException ex, StopWatch stopWatch) { | |
recordReadOperation(operation, readPreference, stopWatch); | |
} | |
@Override | |
public <T> void afterReadSuccess(ReadOperation<T> operation, ReadPreference readPreference, T result, StopWatch stopWatch) { | |
recordReadOperation(operation, readPreference, stopWatch); | |
} | |
private <T> void recordReadOperation(ReadOperation<T> operation, ReadPreference readPreference, StopWatch stopWatch) { | |
if (!isTracingEnabled()) { | |
return; | |
} | |
LocalTracer localTracer = tracer.localTracer(); | |
long startTimeInMicros = TimeUnit.MILLISECONDS.toMicros(stopWatch.getStartTime()); | |
localTracer.startNewSpan(MongoDbConstants.COMPONENT_NAME, MongoDbConstants.READ_OPERATION_NAME, startTimeInMicros); | |
localTracer.submitBinaryAnnotation(MongoDbConstants.QUERY_OPERATION_ANNOTATION, operation.getClass().getSimpleName()); | |
localTracer.submitBinaryAnnotation(MongoDbConstants.QUERY_READ_PREFERENCE_ANNOTATION, readPreference.getName()); | |
TracingInfo info = TracingInfo.fromReadOperation(operation); | |
localTracer.submitBinaryAnnotation(MongoDbConstants.QUERY_DB_ANNOTATION, info.getDb()); | |
localTracer.submitBinaryAnnotation(MongoDbConstants.QUERY_COLLECTION_ANNOTATION, info.getCollection()); | |
localTracer.submitBinaryAnnotation(MongoDbConstants.QUERY_FILTER_ANNOTATION, info.getFilter()); | |
localTracer.finishSpan(TimeUnit.NANOSECONDS.toMicros(stopWatch.getNanoTime())); | |
} | |
@Override | |
public <T> void afterWriteError(WriteOperation<T> operation, RuntimeException ex, StopWatch stopWatch) { | |
recordWriteOperation(operation, stopWatch); | |
} | |
@Override | |
public <T> void afterWriteSuccess(WriteOperation<T> operation, T result, StopWatch stopWatch) { | |
recordWriteOperation(operation, stopWatch); | |
} | |
private <T> void recordWriteOperation(WriteOperation<T> operation, StopWatch stopWatch) { | |
if (!isTracingEnabled()) { | |
return; | |
} | |
LocalTracer localTracer = tracer.localTracer(); | |
long startTimeInMicros = TimeUnit.MILLISECONDS.toMicros(stopWatch.getStartTime()); | |
localTracer.startNewSpan(MongoDbConstants.COMPONENT_NAME, MongoDbConstants.WRITE_OPERATION_NAME, startTimeInMicros); | |
localTracer.submitBinaryAnnotation(MongoDbConstants.QUERY_OPERATION_ANNOTATION, operation.getClass().getSimpleName()); | |
TracingInfo info = TracingInfo.fromWriteOperation(operation); | |
localTracer.submitBinaryAnnotation(MongoDbConstants.QUERY_DB_ANNOTATION, info.getDb()); | |
localTracer.submitBinaryAnnotation(MongoDbConstants.QUERY_COLLECTION_ANNOTATION, info.getCollection()); | |
localTracer.submitBinaryAnnotation(MongoDbConstants.QUERY_WRITE_CONCERN_ANNOTATION, info.getWriteConcern()); | |
localTracer.submitBinaryAnnotation(MongoDbConstants.QUERY_FILTER_ANNOTATION, info.getFilter()); | |
localTracer.finishSpan(TimeUnit.NANOSECONDS.toMicros(stopWatch.getNanoTime())); | |
} | |
private boolean isTracingEnabled() { | |
return Boolean.TRUE.equals(tracer.serverSpanThreadBinder().getCurrentServerSpan().getSample()); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
interface MongoDbConstants { | |
String COMPONENT_NAME = "MongoDB"; | |
String READ_OPERATION_NAME = "db_read"; | |
String WRITE_OPERATION_NAME = "db_write"; | |
String QUERY_DB_ANNOTATION = "query.db"; | |
String QUERY_COLLECTION_ANNOTATION = "query.collection"; | |
String QUERY_OPERATION_ANNOTATION = "query.operation"; | |
String QUERY_FILTER_ANNOTATION = "query.filter"; | |
String QUERY_READ_PREFERENCE_ANNOTATION = "query.read_preference"; | |
String QUERY_WRITE_CONCERN_ANNOTATION = "query.write_concern"; | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mongodb; | |
import com.mongodb.operation.ReadOperation; | |
import com.mongodb.operation.WriteOperation; | |
import org.apache.commons.lang3.time.StopWatch; | |
public interface MongoQueryEventSubscriber { | |
default <T> void beforeExecuteRead(ReadOperation<T> operation, ReadPreference readPreference){ | |
} | |
default <T> void afterReadSuccess(ReadOperation<T> operation, ReadPreference readPreference, | |
T result, StopWatch stopWatch){ | |
} | |
default <T> void afterReadError(ReadOperation<T> operation, ReadPreference readPreference, | |
RuntimeException ex, StopWatch stopWatch) { | |
} | |
default <T> void beforeExecuteWrite(WriteOperation<T> operation) { | |
} | |
default <T> void afterWriteSuccess(WriteOperation<T> operation, T result, StopWatch stopWatch) { | |
} | |
default <T> void afterWriteError(WriteOperation<T> operation, RuntimeException ex, StopWatch stopWatch) { | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.mongodb; | |
import com.mongodb.operation.OperationExecutor; | |
import com.mongodb.operation.ReadOperation; | |
import com.mongodb.operation.WriteOperation; | |
import org.apache.commons.lang3.time.StopWatch; | |
import java.util.List; | |
import java.util.Set; | |
import javax.inject.Provider; | |
/** | |
* A wrapper that allows to register subscribers to query events. | |
* | |
* <p>This class needed to be in the package {@code com.mongodb} because it needed to expose methods that were package-private, | |
* such as {@link #createOperationExecutor()} | |
* | |
* @see ObservableOperationExecutor | |
* @see MongoQueryEventSubscriber | |
*/ | |
public class ObservableMongoClientWrapper extends MongoClient { | |
private final Provider<Set<MongoQueryEventSubscriber>> operationSubscribers; | |
public ObservableMongoClientWrapper(ServerAddress serverAddress, | |
MongoClientOptions mongoClientOptions, | |
Provider<Set<MongoQueryEventSubscriber>> operationSubscribers) { | |
super(serverAddress, mongoClientOptions); | |
this.operationSubscribers = operationSubscribers; | |
} | |
public <T> ObservableMongoClientWrapper(ServerAddress serverAddress, | |
List<MongoCredential> ts, | |
MongoClientOptions mongoClientOptions, | |
Provider<Set<MongoQueryEventSubscriber>> operationSubscribers) { | |
super(serverAddress, ts, mongoClientOptions); | |
this.operationSubscribers = operationSubscribers; | |
} | |
public ObservableMongoClientWrapper(List<ServerAddress> servers, | |
MongoClientOptions mongoClientOptions, | |
Provider<Set<MongoQueryEventSubscriber>> operationSubscribers) { | |
super(servers, mongoClientOptions); | |
this.operationSubscribers = operationSubscribers; | |
} | |
public ObservableMongoClientWrapper(List<ServerAddress> servers, | |
List<MongoCredential> ts, | |
MongoClientOptions mongoClientOptions, | |
Provider<Set<MongoQueryEventSubscriber>> operationSubscribers) { | |
super(servers, ts, mongoClientOptions); | |
this.operationSubscribers = operationSubscribers; | |
} | |
/** | |
* Creates an OperationExecutor that will notify subscribers of events that happen in the original one. | |
*/ | |
@Override | |
public OperationExecutor createOperationExecutor() { | |
OperationExecutor originalExecutor = super.createOperationExecutor(); | |
return new ObservableOperationExecutor(originalExecutor, operationSubscribers.get()); | |
} | |
/** | |
* This method is overriden to make it public since without it testing (mock/spy) won't work. | |
*/ | |
@Override | |
public <T> T execute(ReadOperation<T> operation, ReadPreference readPreference) { | |
return super.execute(operation, readPreference); | |
} | |
/** | |
* This method is overriden to make it public since without it testing (mock/spy) won't work. | |
*/ | |
@Override | |
public <T> T execute(WriteOperation<T> operation) { | |
return super.execute(operation); | |
} | |
private static class ObservableOperationExecutor implements OperationExecutor { | |
private final OperationExecutor executor; | |
private final Set<MongoQueryEventSubscriber> subscribers; | |
ObservableOperationExecutor(OperationExecutor executor, | |
Set<MongoQueryEventSubscriber> subscribers) { | |
this.executor = executor; | |
this.subscribers = subscribers; | |
} | |
@Override | |
public <T> T execute(ReadOperation<T> operation, ReadPreference readPreference) { | |
StopWatch stopWatch = new StopWatch(); | |
try { | |
subscribers.forEach(subscriber -> subscriber.beforeExecuteRead(operation, readPreference)); | |
T result; | |
result = timedReadExecute(operation, readPreference, stopWatch); | |
subscribers.forEach(subscriber -> subscriber.afterReadSuccess(operation, readPreference, result, stopWatch)); | |
return result; | |
} catch (RuntimeException e) { | |
subscribers.forEach(subscriber -> subscriber.afterReadError(operation, readPreference, e, stopWatch)); | |
throw e; | |
} | |
} | |
@Override | |
public <T> T execute(WriteOperation<T> operation) { | |
StopWatch stopWatch = new StopWatch(); | |
try { | |
subscribers.forEach(subscriber -> subscriber.beforeExecuteWrite(operation)); | |
T result = timedWriteExecute(operation, stopWatch); | |
subscribers.forEach(subscriber -> subscriber.afterWriteSuccess(operation, result, stopWatch)); | |
return result; | |
} catch (RuntimeException e) { | |
subscribers.forEach(subscriber -> subscriber.afterWriteError(operation, e, stopWatch)); | |
throw e; | |
} | |
} | |
private <T> T timedReadExecute(ReadOperation<T> operation, ReadPreference readPreference, StopWatch stopWatch) { | |
T result; | |
try { | |
stopWatch.start(); | |
result = executor.execute(operation, readPreference); | |
} finally { | |
stopWatch.stop(); | |
} | |
return result; | |
} | |
private <T> T timedWriteExecute(WriteOperation<T> operation, StopWatch stopWatch) { | |
T result; | |
try { | |
stopWatch.start(); | |
result = executor.execute(operation); | |
} finally { | |
stopWatch.stop(); | |
} | |
return result; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.tracing.db.mongo; | |
import com.mongodb.operation.BaseWriteOperation; | |
import com.mongodb.operation.CountOperation; | |
import com.mongodb.operation.DeleteOperation; | |
import com.mongodb.operation.DistinctOperation; | |
import com.mongodb.operation.FindAndDeleteOperation; | |
import com.mongodb.operation.FindAndReplaceOperation; | |
import com.mongodb.operation.FindAndUpdateOperation; | |
import com.mongodb.operation.FindOperation; | |
import com.mongodb.operation.GroupOperation; | |
import com.mongodb.operation.MapReduceWithInlineResultsOperation; | |
import com.mongodb.operation.ReadOperation; | |
import com.mongodb.operation.WriteOperation; | |
class TracingInfo { | |
private static final String EMPTY = "N/A"; | |
private String filter; | |
private String collection; | |
private String db; | |
private String writeConcern; | |
private TracingInfo(String filter, String collection, String db) { | |
this.filter = filter; | |
this.collection = collection; | |
this.db = db; | |
} | |
private TracingInfo(String filter, String collection, String databaseName, String writeConcern) { | |
this(filter, collection, databaseName); | |
this.writeConcern = writeConcern; | |
} | |
public String getFilter() { | |
return filter; | |
} | |
public String getCollection() { | |
return collection; | |
} | |
public String getDb() { | |
return db; | |
} | |
public String getWriteConcern() { | |
return writeConcern; | |
} | |
static TracingInfo fromReadOperation(ReadOperation operation) { | |
String filter = EMPTY; | |
String collection = EMPTY; | |
String db = EMPTY; | |
if (operation instanceof FindOperation) { | |
filter = ((FindOperation) operation).getFilter().toString(); | |
collection = ((FindOperation) operation).getNamespace().getCollectionName(); | |
db = ((FindOperation) operation).getNamespace().getDatabaseName(); | |
} else if (operation instanceof CountOperation) { | |
filter = ((CountOperation) operation).getFilter().toString(); | |
} else if (operation instanceof DistinctOperation<?>) { | |
filter = ((DistinctOperation) operation).getFilter().toString(); | |
} else if (operation instanceof GroupOperation<?>) { | |
filter = ((GroupOperation) operation).getFilter().toString(); | |
} else if (operation instanceof MapReduceWithInlineResultsOperation<?>) { | |
filter = ((MapReduceWithInlineResultsOperation) operation).getFilter().toString(); | |
} | |
return new TracingInfo(filter, collection, db); | |
} | |
static TracingInfo fromWriteOperation(WriteOperation operation) { | |
String collection = EMPTY; | |
String writeConcern = EMPTY; | |
String filter = EMPTY; | |
String databaseName = EMPTY; | |
if (operation instanceof BaseWriteOperation) { | |
collection = ((BaseWriteOperation) operation).getNamespace().getCollectionName(); | |
writeConcern = ((BaseWriteOperation) operation).getWriteConcern().toString(); | |
databaseName = ((BaseWriteOperation) operation).getNamespace().getDatabaseName(); | |
} | |
if (operation instanceof DeleteOperation) { | |
filter = ((DeleteOperation) operation).getDeleteRequests().iterator().next().getFilter().toString(); | |
} else if (operation instanceof FindAndDeleteOperation) { | |
filter = ((FindAndDeleteOperation) operation).getFilter().toString(); | |
collection = ((FindAndDeleteOperation) operation).getNamespace().getCollectionName(); | |
} else if (operation instanceof FindAndUpdateOperation) { | |
filter = ((FindAndUpdateOperation) operation).getFilter().toString(); | |
collection = ((FindAndUpdateOperation) operation).getNamespace().getCollectionName(); | |
} else if (operation instanceof FindAndReplaceOperation) { | |
filter = ((FindAndReplaceOperation) operation).getFilter().toString(); | |
collection = ((FindAndReplaceOperation) operation).getNamespace().getCollectionName(); | |
} | |
return new TracingInfo(filter, collection, databaseName, writeConcern); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment