Created
April 20, 2013 21:01
-
-
Save itaifrenkel/5427416 to your computer and use it in GitHub Desktop.
Hack to monitor Kafka 0.8 producers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import kafka.producer.KeyedMessage; | |
import kafka.producer.async.DefaultEventHandler; | |
import org.aspectj.lang.JoinPoint; | |
import org.aspectj.lang.annotation.AfterReturning; | |
import org.aspectj.lang.annotation.AfterThrowing; | |
import org.aspectj.lang.annotation.Aspect; | |
import org.aspectj.lang.annotation.Before; | |
import scala.collection.JavaConversions; | |
import scala.collection.Seq; | |
import java.util.List; | |
@Aspect | |
public class KafkaEventHandlerMonitorAspect { | |
@Before("execution(* kafka.producer.async.DefaultEventHandler.handle(..))") | |
public void onBeforeHandler(JoinPoint joinPoint) throws Throwable { | |
try { | |
KafkaEventHandlerMonitorSingleton.onBeforeHandler( | |
getDefaultEventHandler(joinPoint), | |
getKeyedMessages(joinPoint)); | |
} | |
catch (Throwable t) { | |
System.err.println(t); | |
throw t; | |
} | |
} | |
@AfterThrowing( | |
pointcut = "execution(* kafka.producer.async.DefaultEventHandler.handle(..))", | |
throwing = "error") | |
public void onAfterHandlerThrowing(JoinPoint joinPoint, Throwable error) throws Throwable { | |
try { | |
KafkaEventHandlerMonitorSingleton.onAfterHandlerThrowing( | |
getDefaultEventHandler(joinPoint), | |
getKeyedMessages(joinPoint), | |
error); | |
} | |
catch (Throwable t) { | |
System.err.println(t); | |
throw t; | |
} | |
} | |
@AfterReturning( | |
pointcut = "execution(* kafka.producer.async.DefaultEventHandler.handle(..))") | |
public void onAfterHandlerReturning(JoinPoint joinPoint) throws Throwable { | |
try { | |
KafkaEventHandlerMonitorSingleton.onAfterHandlerReturning( | |
getDefaultEventHandler(joinPoint), | |
getKeyedMessages(joinPoint)); | |
} | |
catch (Throwable t) { | |
System.err.println(t); | |
throw t; | |
} | |
} | |
private List<KeyedMessage> getKeyedMessages(JoinPoint joinPoint) { | |
return JavaConversions.asList((Seq<KeyedMessage>)joinPoint.getArgs()[0]); | |
} | |
private DefaultEventHandler getDefaultEventHandler(JoinPoint joinPoint) { | |
return (DefaultEventHandler) joinPoint.getTarget(); | |
} | |
} |
This is, indeed, very very helpful piece of logic. Because I was also looking for same thing i.e. how to know in calling thread some exception has occurred while sending event to kafka and back it up in "exception" situation.
-- Do I need to make change in DefaultEventHandler#handle() too so as to intercept it from Java+AspectJ ?? If yes then please elaborate steps.I don't know much about scala+AspectJ :)
Because I've been trying with this piece of code and so far no success. My project structure is Spring+Maven+Java. Let me know your thoughts.
Thanks.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Note that this does not take into account serialization errors in the async consumer since they are never raised from any method in DefaultEventHandler. See this snippet from DefaultEventHandler:
} catch {
case t =>
producerStats.serializationErrorRate.mark()
if (isSync) {
throw t
} else {
// currently, if in async mode, we just log the serialization error. We need to revisit
// this when doing kafka-496
error("Error serializing message for topic %s".format(e.topic), t)
}
}