Created
November 14, 2013 04:19
-
-
Save davemiller/7461268 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package au.com.msquare.storm.base; | |
import backtype.storm.task.OutputCollector; | |
import backtype.storm.task.TopologyContext; | |
import backtype.storm.topology.OutputFieldsDeclarer; | |
import backtype.storm.topology.base.BaseRichBolt; | |
import backtype.storm.tuple.Fields; | |
import backtype.storm.tuple.Tuple; | |
import com.dynatrace.adk.DynaTraceADKFactory; | |
import com.dynatrace.adk.Tagging; | |
import java.util.List; | |
import java.util.Map; | |
/** | |
* User: david | |
* Base Tracing bolt, extend and implement executeBolt and the bolt will use the trace id and join the purepath | |
*/ | |
public abstract class TraceBaseBolt extends BaseRichBolt { | |
private OutputCollector collector; | |
private Fields fields; | |
protected static final String TRACE_ID_FIELD = "__TRACE_ID"; | |
public TraceBaseBolt(Fields f) { | |
List fieldNames = f.toList(); | |
// always declare field for the trace id | |
fieldNames.add(TRACE_ID_FIELD); | |
this.fields = new Fields(fieldNames); | |
} | |
@Override | |
public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) { | |
this.collector = outputCollector; | |
} | |
@Override | |
public final void execute(Tuple tuple) { | |
TracingCollector tracingCollector = createCollector( extractTraceId(tuple)); | |
try { | |
executeBolt(tuple, tracingCollector); | |
} finally { | |
tracingCollector.stopTrace(); | |
} | |
} | |
private byte[] extractTraceId(Tuple tuple) { | |
return tuple.getBinaryByField(TRACE_ID_FIELD); | |
} | |
private TracingCollector createCollector( byte[] traceId) { | |
DynaTraceADKFactory.initialize(); | |
Tagging tagging = DynaTraceADKFactory.createTagging(); | |
tagging.setTag(traceId); //use traceId to "join" the purepath | |
tagging.startServerPurePath(); //start the sub path for this bolt | |
byte[] newTag = tagging.getTag(); //create a new traceId which will be emitted to subsequent bolts | |
return new TracingCollector(collector, tagging, newTag); | |
} | |
/* | |
Bolts should implement executeBolt as they would a normal Bolt.execute | |
*/ | |
protected abstract void executeBolt(Tuple tuple, OutputCollector collector); | |
@Override | |
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { | |
outputFieldsDeclarer.declare(fields); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package au.com.msquare.storm.base; | |
import backtype.storm.task.IOutputCollector; | |
import backtype.storm.task.OutputCollector; | |
import backtype.storm.tuple.Tuple; | |
import com.dynatrace.adk.Tagging; | |
import java.util.Collection; | |
import java.util.List; | |
/** | |
* User: david | |
* | |
* Storm OutputCollector wrapper that links purepaths and passes the | |
*/ | |
public class TracingCollector extends OutputCollector { | |
private Tagging tagging; | |
private byte[] traceId; | |
public TracingCollector(IOutputCollector delegate, Tagging tagging, byte[] traceId) { | |
super(delegate); | |
this.tagging = tagging; | |
this.traceId = traceId; | |
} | |
private List<Object> addTraceId(List<Object> values) { | |
tagging.linkClientPurePath(true, traceId); | |
values.add(traceId); | |
return values; | |
} | |
private List<Object> addTraceId(Tuple tuple) { | |
return addTraceId(tuple.getValues()); | |
} | |
public void stopTrace() { | |
tagging.endServerPurePath(); | |
} | |
@Override | |
public List<Integer> emit(String streamId, Tuple anchor, List<Object> tuple) { | |
return super.emit(streamId, anchor, addTraceId(tuple)); | |
} | |
@Override | |
public List<Integer> emit(String streamId, List<Object> tuple) { | |
return super.emit(streamId, addTraceId(tuple)); | |
} | |
@Override | |
public List<Integer> emit(Collection<Tuple> anchors, List<Object> tuple) { | |
return super.emit(anchors, addTraceId(tuple)); | |
} | |
@Override | |
public List<Integer> emit(Tuple anchor, List<Object> tuple) { | |
return super.emit(anchor, addTraceId(tuple)); | |
} | |
/* Add other collector methods as needed */ | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment