Skip to content

Instantly share code, notes, and snippets.

@anuraaga
Created December 16, 2016 06:32
Show Gist options
  • Save anuraaga/c6030b8158832d1deccfb01e516c036c to your computer and use it in GitHub Desktop.
Save anuraaga/c6030b8158832d1deccfb01e516c036c to your computer and use it in GitHub Desktop.
import com.google.common.base.Strings;
import com.google.devtools.cloudtrace.v1.PatchTracesRequest;
import com.google.devtools.cloudtrace.v1.Trace;
import com.google.devtools.cloudtrace.v1.TraceServiceGrpc;
import com.google.devtools.cloudtrace.v1.TraceSpan;
import com.google.devtools.cloudtrace.v1.Traces;
import com.google.protobuf.util.Timestamps;
import com.github.kristofa.brave.EmptySpanCollectorMetricsHandler;
import com.github.kristofa.brave.FlushingSpanCollector;
import com.github.kristofa.brave.IdConversion;
import com.spotify.futures.FuturesExtra;
import com.twitter.zipkin.gen.Annotation;
import com.twitter.zipkin.gen.BinaryAnnotation;
import com.twitter.zipkin.gen.Span;
import jp.skypencil.guava.stream.GuavaCollectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import settings.MonitoringSettings;
/**
* A brave span collector that converts the spans and posts to stackdriver trace.
*/
public class StackdriverSpanCollector extends FlushingSpanCollector {
private static final Logger logger = LoggerFactory.getLogger(StackdriverSpanCollector.class);
private final TraceServiceGrpc.TraceServiceFutureStub traceService;
private final MonitoringSettings monitoringSettings;
public StackdriverSpanCollector(
TraceServiceGrpc.TraceServiceFutureStub traceService,
MonitoringSettings monitoringSettings) {
super(
new EmptySpanCollectorMetricsHandler(),
monitoringSettings.getTraceReportingIntervalSecs());
this.traceService = traceService;
this.monitoringSettings = monitoringSettings;
}
@Override
protected void reportSpans(List<Span> spans) {
List<Trace> traces = spans.stream()
.collect(Collectors.groupingBy(Span::getTrace_id))
.entrySet()
.stream()
.map(traceIdAndSpans -> Trace.newBuilder()
.setTraceId(Strings.padStart(
IdConversion.convertToString(traceIdAndSpans.getKey()), 32, '0'))
.setProjectId(monitoringSettings.getGoogleProjectId())
.addAllSpans(
traceIdAndSpans.getValue()
.stream()
.map(span -> {
TraceSpan.SpanKind kind = TraceSpan.SpanKind.SPAN_KIND_UNSPECIFIED;
String serviceName = "unknown";
for (Annotation annotation : span.getAnnotations()) {
if (annotation.value.equals("sr")
|| annotation.value.equals("ss")) {
kind = TraceSpan.SpanKind.RPC_SERVER;
serviceName = annotation.host.service_name;
break;
} else if (annotation.value.equals("cr")
|| annotation.value.equals("cs")) {
kind = TraceSpan.SpanKind.RPC_CLIENT;
serviceName = annotation.host.service_name;
break;
}
}
Map<String, BinaryAnnotation> annotations =
span.getBinary_annotations().stream()
.collect(GuavaCollectors.toImmutableMap(
BinaryAnnotation::getKey,
Function.identity()));
TraceSpan.Builder traceSpan = TraceSpan.newBuilder()
.setSpanId(span.getId())
.setStartTime(Timestamps.fromMicros(span.getTimestamp()))
.setEndTime(
Timestamps.fromMicros(span.getTimestamp() + span.getDuration()))
.setKind(kind);
if (span.getParent_id() != null) {
traceSpan.setParentSpanId(span.getParent_id());
}
// Cloud trace uses this legacy app-engine label for "service name".
traceSpan.putLabels(
"trace.cloud.google.com/gae/app/module_version",
serviceName);
final String spanName;
// Hackily recognize common HTTP methods since armeria will set it as the
// span name, even though stackdriver wants the URI as the span name.
if ((span.getName().equals("get") || span.getName().equals("post"))
&& annotations.containsKey("server.uri")) {
URI uri = URI.create(
new String(
annotations.get("server.uri").getValue(),
StandardCharsets.UTF_8));
spanName = uri.getPath();
traceSpan.putLabels(
"trace.cloud.google.com/http/method",
span.getName().toUpperCase());
} else {
spanName = traceSpan.getName();
}
traceSpan.setName(spanName);
maybeCopyZipkinAnnotation(
annotations,
"server.uri",
"trace.cloud.google.com/http/url",
traceSpan);
maybeCopyZipkinAnnotation(
annotations,
"server.cause",
"trace.cloud.google.com/stacktrace",
traceSpan);
maybeCopyZipkinAnnotation(
annotations,
"server.remote",
"trace.cloud.google.com/http/source/ip",
traceSpan);
if (annotations.containsKey("server.result")) {
String result = new String(annotations.get("server.result").getValue(),
StandardCharsets.UTF_8);
if (result.equals("success")) {
traceSpan.putLabels("trace.cloud.google.com/http/status_code", "200");
} else {
traceSpan.putLabels("trace.cloud.google.com/http/status_code", "500");
}
}
return traceSpan.build();
})
.collect(GuavaCollectors.toImmutableList()))
.build())
.collect(GuavaCollectors.toImmutableList());
PatchTracesRequest request = PatchTracesRequest.newBuilder()
.setProjectId(monitoringSettings.getGoogleProjectId())
.setTraces(Traces.newBuilder().addAllTraces(traces))
.build();
FuturesExtra.addFailureCallback(
traceService.patchTraces(request),
t -> logger.warn("Error sending traces to stackdriver.", t));
}
private void maybeCopyZipkinAnnotation(
Map<String, BinaryAnnotation> annotations,
String zipkinKey,
String stackdriverKey,
TraceSpan.Builder traceSpan) {
if (annotations.containsKey(zipkinKey)) {
traceSpan.putLabels(
stackdriverKey,
new String(annotations.get(zipkinKey).getValue(),
StandardCharsets.UTF_8));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment