Skip to content

Instantly share code, notes, and snippets.

@davemiller
Created November 14, 2013 04:19
Show Gist options
  • Save davemiller/7461268 to your computer and use it in GitHub Desktop.
Save davemiller/7461268 to your computer and use it in GitHub Desktop.
package au.com.msquare.storm.base;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import com.dynatrace.adk.DynaTraceADKFactory;
import com.dynatrace.adk.Tagging;
import java.util.List;
import java.util.Map;
/**
* User: david
* Base Tracing bolt, extend and implement executeBolt and the bolt will use the trace id and join the purepath
*/
public abstract class TraceBaseBolt extends BaseRichBolt {
private OutputCollector collector;
private Fields fields;
protected static final String TRACE_ID_FIELD = "__TRACE_ID";
public TraceBaseBolt(Fields f) {
List fieldNames = f.toList();
// always declare field for the trace id
fieldNames.add(TRACE_ID_FIELD);
this.fields = new Fields(fieldNames);
}
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
this.collector = outputCollector;
}
@Override
public final void execute(Tuple tuple) {
TracingCollector tracingCollector = createCollector( extractTraceId(tuple));
try {
executeBolt(tuple, tracingCollector);
} finally {
tracingCollector.stopTrace();
}
}
private byte[] extractTraceId(Tuple tuple) {
return tuple.getBinaryByField(TRACE_ID_FIELD);
}
private TracingCollector createCollector( byte[] traceId) {
DynaTraceADKFactory.initialize();
Tagging tagging = DynaTraceADKFactory.createTagging();
tagging.setTag(traceId); //use traceId to "join" the purepath
tagging.startServerPurePath(); //start the sub path for this bolt
byte[] newTag = tagging.getTag(); //create a new traceId which will be emitted to subsequent bolts
return new TracingCollector(collector, tagging, newTag);
}
/*
Bolts should implement executeBolt as they would a normal Bolt.execute
*/
protected abstract void executeBolt(Tuple tuple, OutputCollector collector);
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(fields);
}
}
package au.com.msquare.storm.base;
import backtype.storm.task.IOutputCollector;
import backtype.storm.task.OutputCollector;
import backtype.storm.tuple.Tuple;
import com.dynatrace.adk.Tagging;
import java.util.Collection;
import java.util.List;
/**
* User: david
*
* Storm OutputCollector wrapper that links purepaths and passes the
*/
public class TracingCollector extends OutputCollector {
private Tagging tagging;
private byte[] traceId;
public TracingCollector(IOutputCollector delegate, Tagging tagging, byte[] traceId) {
super(delegate);
this.tagging = tagging;
this.traceId = traceId;
}
private List<Object> addTraceId(List<Object> values) {
tagging.linkClientPurePath(true, traceId);
values.add(traceId);
return values;
}
private List<Object> addTraceId(Tuple tuple) {
return addTraceId(tuple.getValues());
}
public void stopTrace() {
tagging.endServerPurePath();
}
@Override
public List<Integer> emit(String streamId, Tuple anchor, List<Object> tuple) {
return super.emit(streamId, anchor, addTraceId(tuple));
}
@Override
public List<Integer> emit(String streamId, List<Object> tuple) {
return super.emit(streamId, addTraceId(tuple));
}
@Override
public List<Integer> emit(Collection<Tuple> anchors, List<Object> tuple) {
return super.emit(anchors, addTraceId(tuple));
}
@Override
public List<Integer> emit(Tuple anchor, List<Object> tuple) {
return super.emit(anchor, addTraceId(tuple));
}
/* Add other collector methods as needed */
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment