Skip to content

Instantly share code, notes, and snippets.

@jkuipers
Last active May 2, 2019 12:18
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jkuipers/83747663b657c253254c51c0551f9f9c to your computer and use it in GitHub Desktop.
Save jkuipers/83747663b657c253254c51c0551f9f9c to your computer and use it in GitHub Desktop.
Spring Cloud AWS QueueMessagingTemplate subtype which adds Brave and Micrometer.io integration for sending messages
import brave.Span;
import brave.Tracer;
import brave.Tracing;
import brave.propagation.Propagation;
import brave.propagation.TraceContext;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import org.springframework.cloud.aws.messaging.core.QueueMessageChannel;
import org.springframework.cloud.aws.messaging.core.QueueMessagingTemplate;
import org.springframework.core.NamedThreadLocal;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import zipkin2.Endpoint;
import java.util.concurrent.TimeUnit;
/**
* Injects Brace tracing headers and records a micrometer.io metric when sending messages.
* Note that receiving isn't supported (i.e. it isn't instrumented).
*/
public class InstrumentedQueueMessagingTemplate extends QueueMessagingTemplate {
private static final ThreadLocal<String> unresolvedDestination =
new NamedThreadLocal<>("QueueMessagingTemplate unresolved destination");
private static final Propagation.Setter<MessageBuilder, String> SETTER =
(carrier, key, value) -> carrier.setHeader(key, value);
private static final Endpoint SQS_ENDPOINT = Endpoint.newBuilder().serviceName("sqs").build();
private Tracer tracer;
private TraceContext.Injector<MessageBuilder> injector;
private MeterRegistry meterRegistry;
public InstrumentedQueueMessagingTemplate(AmazonSQSAsync amazonSqs, Tracing tracing, MeterRegistry meterRegistry) {
super(amazonSqs);
this.injector = tracing.propagation().injector(SETTER);
this.tracer = tracing.tracer();
this.meterRegistry = meterRegistry;
}
@Override
protected void doSend(QueueMessageChannel destination, Message<?> message) {
Message<?> injectedMessage = injectMessage(message);
long startTime = System.nanoTime();
try {
super.doSend(destination, injectedMessage);
} finally {
Timer.builder("sqs.sent.messages")
.tag("queue", queueName())
.description("Timer of QueueMessagingTemplate send operations")
.register(meterRegistry)
.record(System.nanoTime() - startTime, TimeUnit.NANOSECONDS);
unresolvedDestination.remove();
}
}
@Override
protected QueueMessageChannel resolveMessageChannelByLogicalName(String destination) {
unresolvedDestination.set(destination);
return super.resolveMessageChannelByLogicalName(destination);
}
private String queueName() {
String name = unresolvedDestination.get();
return name == null ? "" : name;
}
private Message<?> injectMessage(Message<?> message) {
Span span = tracer.nextSpan()
.kind(Span.Kind.PRODUCER)
.name("send")
.remoteEndpoint(SQS_ENDPOINT)
.start();
MessageBuilder messageBuilder = MessageBuilder.fromMessage(message);
injector.inject(span.context(), messageBuilder);
span.finish();
return messageBuilder.build();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment