Skip to content

Instantly share code, notes, and snippets.

@codefromthecrypt
Last active July 12, 2018 19:56
Show Gist options
  • Save codefromthecrypt/76d94054b77e3be338bd75424ca8ba30 to your computer and use it in GitHub Desktop.
Save codefromthecrypt/76d94054b77e3be338bd75424ca8ba30 to your computer and use it in GitHub Desktop.
Kafka one-way with Brave
/**
* This is an example of a one-way or "messaging span", which is possible by use of the {@link
* Span#flush()} operator.
*
* <p>Note that this uses a span as a kafka key, not because it is recommended, rather as it is
* convenient for demonstration, since kafka doesn't have message properties.
*
* <p>See https://github.com/openzipkin/zipkin/issues/1243
*/
public class KafkaExampleIT {
@Rule public KafkaJunitRule kafka = new KafkaJunitRule(EphemeralKafkaBroker.create());
InMemoryStorage storage = new InMemoryStorage();
Tracing tracing = Tracing.newBuilder()
.localEndpoint(Endpoint.builder().serviceName("producer").build())
.reporter(s -> storage.spanConsumer().accept(Collections.singletonList(s)))
.build();
KafkaProducer<Span, String> producer;
KafkaConsumer<Span, String> consumer;
Endpoint kafkaEndpoint;
@Before public void setup() {
producer = kafka.helper()
.createProducer(new SpanSerializer(tracing), new StringSerializer(), null);
consumer = kafka.helper()
.createConsumer(new SpanDeserializer(storage), new StringDeserializer(), null);
kafkaEndpoint =
Endpoint.builder().serviceName("kafka").port(kafka.helper().kafkaPort()).build();
}
@Test
public void startWithOneTracerAndStopWithAnother() throws Exception {
String topic = "startWithOneTracerAndStopWithAnother";
consumer.subscribe(Collections.singletonList(topic));
Span span = tracing.tracer().newTrace().kind(Span.Kind.CLIENT).remoteEndpoint(kafkaEndpoint);
producer.send(new ProducerRecord<>(topic, span, "foo")).get();
span.flush();
producer.close();
consumer.poll(500L).forEach(record -> {
record.key()
.name(record.value())
.kind(Span.Kind.SERVER)
.remoteEndpoint(kafkaEndpoint)
.flush();
});
consumer.close();
}
}
/** This class simulates a consumer being on a different process, by not sharing a tracer */
final class SpanDeserializer implements Deserializer<Span> {
final Tracing tracing;
final TraceContext.Extractor<Properties> extractor;
SpanDeserializer(InMemoryStorage storage) {
tracing = Tracing.newBuilder()
.localEndpoint(Endpoint.builder().serviceName("consumer").build())
.reporter(s -> storage.spanConsumer().accept(Collections.singletonList(s)))
.build();
extractor = tracing.propagation().extractor(Properties::getProperty);
}
@Override public void configure(Map<String, ?> map, boolean b) {
}
/** Extract the span from the key or start a new trace if any problem. */
@Override public Span deserialize(String s, byte[] bytes) {
try {
Properties properties = new Properties();
properties.load(new ByteArrayInputStream(bytes));
// in Brave 4.3 this will be simplified to tracing.tracer().nextSpan(extractor, properties)
TraceContextOrSamplingFlags result = extractor.extract(properties);
return result.context() != null
? tracer.joinSpan(result.context())
: tracer.newTrace(result.samplingFlags());
} catch (RuntimeException | IOException e) {
return tracer.newTrace(); // return a new trace upon failure of any kind
}
}
@Override public void close() {
}
}
final class SpanSerializer implements Serializer<Span> {
final TraceContext.Injector<Properties> injector;
SpanSerializer(Tracing tracing) {
injector = tracing.propagation().injector(Properties::setProperty);
}
@Override public void configure(Map<String, ?> map, boolean b) {
}
@Override public byte[] serialize(String s, Span span) {
Properties properties = new Properties();
injector.inject(span.context(), properties);
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
properties.store(out, "zipkin");
} catch (IOException e) {
throw new AssertionError(e);
}
return out.toByteArray();
}
@Override public void close() {
}
}
@FreeSlaver
Copy link

Thanks a lot!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment