Last active
May 2, 2019 12:18
-
-
Save jkuipers/83747663b657c253254c51c0551f9f9c to your computer and use it in GitHub Desktop.
Spring Cloud AWS QueueMessagingTemplate subtype which adds Brave and Micrometer.io integration for sending messages
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import brave.Span; | |
import brave.Tracer; | |
import brave.Tracing; | |
import brave.propagation.Propagation; | |
import brave.propagation.TraceContext; | |
import com.amazonaws.services.sqs.AmazonSQSAsync; | |
import io.micrometer.core.instrument.MeterRegistry; | |
import io.micrometer.core.instrument.Timer; | |
import org.springframework.cloud.aws.messaging.core.QueueMessageChannel; | |
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate; | |
import org.springframework.core.NamedThreadLocal; | |
import org.springframework.messaging.Message; | |
import org.springframework.messaging.support.MessageBuilder; | |
import zipkin2.Endpoint; | |
import java.util.concurrent.TimeUnit; | |
/** | |
* Injects Brace tracing headers and records a micrometer.io metric when sending messages. | |
* Note that receiving isn't supported (i.e. it isn't instrumented). | |
*/ | |
public class InstrumentedQueueMessagingTemplate extends QueueMessagingTemplate { | |
private static final ThreadLocal<String> unresolvedDestination = | |
new NamedThreadLocal<>("QueueMessagingTemplate unresolved destination"); | |
private static final Propagation.Setter<MessageBuilder, String> SETTER = | |
(carrier, key, value) -> carrier.setHeader(key, value); | |
private static final Endpoint SQS_ENDPOINT = Endpoint.newBuilder().serviceName("sqs").build(); | |
private Tracer tracer; | |
private TraceContext.Injector<MessageBuilder> injector; | |
private MeterRegistry meterRegistry; | |
public InstrumentedQueueMessagingTemplate(AmazonSQSAsync amazonSqs, Tracing tracing, MeterRegistry meterRegistry) { | |
super(amazonSqs); | |
this.injector = tracing.propagation().injector(SETTER); | |
this.tracer = tracing.tracer(); | |
this.meterRegistry = meterRegistry; | |
} | |
@Override | |
protected void doSend(QueueMessageChannel destination, Message<?> message) { | |
Message<?> injectedMessage = injectMessage(message); | |
long startTime = System.nanoTime(); | |
try { | |
super.doSend(destination, injectedMessage); | |
} finally { | |
Timer.builder("sqs.sent.messages") | |
.tag("queue", queueName()) | |
.description("Timer of QueueMessagingTemplate send operations") | |
.register(meterRegistry) | |
.record(System.nanoTime() - startTime, TimeUnit.NANOSECONDS); | |
unresolvedDestination.remove(); | |
} | |
} | |
@Override | |
protected QueueMessageChannel resolveMessageChannelByLogicalName(String destination) { | |
unresolvedDestination.set(destination); | |
return super.resolveMessageChannelByLogicalName(destination); | |
} | |
private String queueName() { | |
String name = unresolvedDestination.get(); | |
return name == null ? "" : name; | |
} | |
private Message<?> injectMessage(Message<?> message) { | |
Span span = tracer.nextSpan() | |
.kind(Span.Kind.PRODUCER) | |
.name("send") | |
.remoteEndpoint(SQS_ENDPOINT) | |
.start(); | |
MessageBuilder messageBuilder = MessageBuilder.fromMessage(message); | |
injector.inject(span.context(), messageBuilder); | |
span.finish(); | |
return messageBuilder.build(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment