Created
December 16, 2016 06:32
-
-
Save anuraaga/c6030b8158832d1deccfb01e516c036c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.common.base.Strings; | |
import com.google.devtools.cloudtrace.v1.PatchTracesRequest; | |
import com.google.devtools.cloudtrace.v1.Trace; | |
import com.google.devtools.cloudtrace.v1.TraceServiceGrpc; | |
import com.google.devtools.cloudtrace.v1.TraceSpan; | |
import com.google.devtools.cloudtrace.v1.Traces; | |
import com.google.protobuf.util.Timestamps; | |
import com.github.kristofa.brave.EmptySpanCollectorMetricsHandler; | |
import com.github.kristofa.brave.FlushingSpanCollector; | |
import com.github.kristofa.brave.IdConversion; | |
import com.spotify.futures.FuturesExtra; | |
import com.twitter.zipkin.gen.Annotation; | |
import com.twitter.zipkin.gen.BinaryAnnotation; | |
import com.twitter.zipkin.gen.Span; | |
import jp.skypencil.guava.stream.GuavaCollectors; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.net.URI; | |
import java.nio.charset.StandardCharsets; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.function.Function; | |
import java.util.stream.Collectors; | |
import settings.MonitoringSettings; | |
/** | |
* A brave span collector that converts the spans and posts to stackdriver trace. | |
*/ | |
public class StackdriverSpanCollector extends FlushingSpanCollector { | |
private static final Logger logger = LoggerFactory.getLogger(StackdriverSpanCollector.class); | |
private final TraceServiceGrpc.TraceServiceFutureStub traceService; | |
private final MonitoringSettings monitoringSettings; | |
public StackdriverSpanCollector( | |
TraceServiceGrpc.TraceServiceFutureStub traceService, | |
MonitoringSettings monitoringSettings) { | |
super( | |
new EmptySpanCollectorMetricsHandler(), | |
monitoringSettings.getTraceReportingIntervalSecs()); | |
this.traceService = traceService; | |
this.monitoringSettings = monitoringSettings; | |
} | |
@Override | |
protected void reportSpans(List<Span> spans) { | |
List<Trace> traces = spans.stream() | |
.collect(Collectors.groupingBy(Span::getTrace_id)) | |
.entrySet() | |
.stream() | |
.map(traceIdAndSpans -> Trace.newBuilder() | |
.setTraceId(Strings.padStart( | |
IdConversion.convertToString(traceIdAndSpans.getKey()), 32, '0')) | |
.setProjectId(monitoringSettings.getGoogleProjectId()) | |
.addAllSpans( | |
traceIdAndSpans.getValue() | |
.stream() | |
.map(span -> { | |
TraceSpan.SpanKind kind = TraceSpan.SpanKind.SPAN_KIND_UNSPECIFIED; | |
String serviceName = "unknown"; | |
for (Annotation annotation : span.getAnnotations()) { | |
if (annotation.value.equals("sr") | |
|| annotation.value.equals("ss")) { | |
kind = TraceSpan.SpanKind.RPC_SERVER; | |
serviceName = annotation.host.service_name; | |
break; | |
} else if (annotation.value.equals("cr") | |
|| annotation.value.equals("cs")) { | |
kind = TraceSpan.SpanKind.RPC_CLIENT; | |
serviceName = annotation.host.service_name; | |
break; | |
} | |
} | |
Map<String, BinaryAnnotation> annotations = | |
span.getBinary_annotations().stream() | |
.collect(GuavaCollectors.toImmutableMap( | |
BinaryAnnotation::getKey, | |
Function.identity())); | |
TraceSpan.Builder traceSpan = TraceSpan.newBuilder() | |
.setSpanId(span.getId()) | |
.setStartTime(Timestamps.fromMicros(span.getTimestamp())) | |
.setEndTime( | |
Timestamps.fromMicros(span.getTimestamp() + span.getDuration())) | |
.setKind(kind); | |
if (span.getParent_id() != null) { | |
traceSpan.setParentSpanId(span.getParent_id()); | |
} | |
// Cloud trace uses this legacy app-engine label for "service name". | |
traceSpan.putLabels( | |
"trace.cloud.google.com/gae/app/module_version", | |
serviceName); | |
final String spanName; | |
// Hackily recognize common HTTP methods since armeria will set it as the | |
// span name, even though stackdriver wants the URI as the span name. | |
if ((span.getName().equals("get") || span.getName().equals("post")) | |
&& annotations.containsKey("server.uri")) { | |
URI uri = URI.create( | |
new String( | |
annotations.get("server.uri").getValue(), | |
StandardCharsets.UTF_8)); | |
spanName = uri.getPath(); | |
traceSpan.putLabels( | |
"trace.cloud.google.com/http/method", | |
span.getName().toUpperCase()); | |
} else { | |
spanName = traceSpan.getName(); | |
} | |
traceSpan.setName(spanName); | |
maybeCopyZipkinAnnotation( | |
annotations, | |
"server.uri", | |
"trace.cloud.google.com/http/url", | |
traceSpan); | |
maybeCopyZipkinAnnotation( | |
annotations, | |
"server.cause", | |
"trace.cloud.google.com/stacktrace", | |
traceSpan); | |
maybeCopyZipkinAnnotation( | |
annotations, | |
"server.remote", | |
"trace.cloud.google.com/http/source/ip", | |
traceSpan); | |
if (annotations.containsKey("server.result")) { | |
String result = new String(annotations.get("server.result").getValue(), | |
StandardCharsets.UTF_8); | |
if (result.equals("success")) { | |
traceSpan.putLabels("trace.cloud.google.com/http/status_code", "200"); | |
} else { | |
traceSpan.putLabels("trace.cloud.google.com/http/status_code", "500"); | |
} | |
} | |
return traceSpan.build(); | |
}) | |
.collect(GuavaCollectors.toImmutableList())) | |
.build()) | |
.collect(GuavaCollectors.toImmutableList()); | |
PatchTracesRequest request = PatchTracesRequest.newBuilder() | |
.setProjectId(monitoringSettings.getGoogleProjectId()) | |
.setTraces(Traces.newBuilder().addAllTraces(traces)) | |
.build(); | |
FuturesExtra.addFailureCallback( | |
traceService.patchTraces(request), | |
t -> logger.warn("Error sending traces to stackdriver.", t)); | |
} | |
private void maybeCopyZipkinAnnotation( | |
Map<String, BinaryAnnotation> annotations, | |
String zipkinKey, | |
String stackdriverKey, | |
TraceSpan.Builder traceSpan) { | |
if (annotations.containsKey(zipkinKey)) { | |
traceSpan.putLabels( | |
stackdriverKey, | |
new String(annotations.get(zipkinKey).getValue(), | |
StandardCharsets.UTF_8)); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment