Skip to content

Instantly share code, notes, and snippets.

@itaifrenkel
Created April 20, 2013 21:01
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save itaifrenkel/5427416 to your computer and use it in GitHub Desktop.
Save itaifrenkel/5427416 to your computer and use it in GitHub Desktop.
Hack to monitor Kafka 0.8 producers
import kafka.producer.KeyedMessage;
import kafka.producer.async.DefaultEventHandler;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.AfterThrowing;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import scala.collection.JavaConversions;
import scala.collection.Seq;
import java.util.List;
@Aspect
public class KafkaEventHandlerMonitorAspect {
@Before("execution(* kafka.producer.async.DefaultEventHandler.handle(..))")
public void onBeforeHandler(JoinPoint joinPoint) throws Throwable {
try {
KafkaEventHandlerMonitorSingleton.onBeforeHandler(
getDefaultEventHandler(joinPoint),
getKeyedMessages(joinPoint));
}
catch (Throwable t) {
System.err.println(t);
throw t;
}
}
@AfterThrowing(
pointcut = "execution(* kafka.producer.async.DefaultEventHandler.handle(..))",
throwing = "error")
public void onAfterHandlerThrowing(JoinPoint joinPoint, Throwable error) throws Throwable {
try {
KafkaEventHandlerMonitorSingleton.onAfterHandlerThrowing(
getDefaultEventHandler(joinPoint),
getKeyedMessages(joinPoint),
error);
}
catch (Throwable t) {
System.err.println(t);
throw t;
}
}
@AfterReturning(
pointcut = "execution(* kafka.producer.async.DefaultEventHandler.handle(..))")
public void onAfterHandlerReturning(JoinPoint joinPoint) throws Throwable {
try {
KafkaEventHandlerMonitorSingleton.onAfterHandlerReturning(
getDefaultEventHandler(joinPoint),
getKeyedMessages(joinPoint));
}
catch (Throwable t) {
System.err.println(t);
throw t;
}
}
private List<KeyedMessage> getKeyedMessages(JoinPoint joinPoint) {
return JavaConversions.asList((Seq<KeyedMessage>)joinPoint.getArgs()[0]);
}
private DefaultEventHandler getDefaultEventHandler(JoinPoint joinPoint) {
return (DefaultEventHandler) joinPoint.getTarget();
}
}
@itaifrenkel
Copy link
Author

Note that this does not take into account serialization errors in the async consumer since they are never raised from any method in DefaultEventHandler. See this snippet from DefaultEventHandler:
} catch {
case t =>
producerStats.serializationErrorRate.mark()
if (isSync) {
throw t
} else {
// currently, if in async mode, we just log the serialization error. We need to revisit
// this when doing kafka-496
error("Error serializing message for topic %s".format(e.topic), t)
}
}

@quode
Copy link

quode commented Jan 8, 2014

This is, indeed, very very helpful piece of logic. Because I was also looking for same thing i.e. how to know in calling thread some exception has occurred while sending event to kafka and back it up in "exception" situation.
-- Do I need to make change in DefaultEventHandler#handle() too so as to intercept it from Java+AspectJ ?? If yes then please elaborate steps.I don't know much about scala+AspectJ :)

Because I've been trying with this piece of code and so far no success. My project structure is Spring+Maven+Java. Let me know your thoughts.

Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment