Skip to content

Instantly share code, notes, and snippets.

@jgonian
Created August 28, 2012 16:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jgonian/1ab7b9a78365bf666084 to your computer and use it in GitHub Desktop.
Save jgonian/1ab7b9a78365bf666084 to your computer and use it in GitHub Desktop.
AXON-10: Infrastructure for replaying events.
@Test
public void shouldInvokeReplayAnnotationsInOrder() {
class A {
@BeforeReplay
public void before() {
}
@AfterReplay
public void after() {
}
}
@ReplayEvents
class B extends A {
@EventHandler
public void handle(DomainEventMessage event) {
}
}
B bean = mock(B.class);
DomainEventMessage event = mock(DomainEventMessage.class);
executePostProcessor(bean, event);
InOrder inOrder = inOrder(bean);
inOrder.verify(bean).before();
inOrder.verify(bean).handle(same(event));
inOrder.verify(bean).after();
}
@Test
public void shouldInvokeHandlerInChild() {
@ReplayEvents
class A {
@EventHandler
public void handleInParent(DomainEventMessage event) {
}
@EventHandler
public void handleInParent(GenericDomainEventMessage event) {
}
}
@ReplayEvents
class B extends A {
@EventHandler
public void handleInChild(DomainEventMessage event) {
}
}
B bean = mock(B.class);
GenericDomainEventMessage event = mock(GenericDomainEventMessage.class);
executePostProcessor(bean, event);
verify((A) bean, never()).handleInParent(isA(DomainEventMessage.class));
verify((A) bean, never()).handleInParent(isA(GenericDomainEventMessage.class));
verify(bean).handleInChild(same(event));
}
package org.axonframework.eventhandling.annotation;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import org.axonframework.common.ReflectionUtils;
import org.axonframework.domain.DomainEventMessage;
import org.axonframework.domain.DomainEventStream;
import org.axonframework.eventstore.management.EventQueryService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.util.StopWatch;
public class ReplayEventsAnnotationBeanPostProcessor implements BeanPostProcessor, ApplicationListener<ContextRefreshedEvent> {
private static final Logger LOG = LoggerFactory.getLogger(ReplayEventsAnnotationBeanPostProcessor.class);
private final List<ZeroArgumentsMethodInvoker> beforeReplayInvokers = new ArrayList<ZeroArgumentsMethodInvoker>();
private final List<ZeroArgumentsMethodInvoker> afterReplayInvokers = new ArrayList<ZeroArgumentsMethodInvoker>();
private final List<AnnotationEventHandlerInvoker> eventHandlerInvokers = new ArrayList<AnnotationEventHandlerInvoker>();
private final AtomicBoolean initialized = new AtomicBoolean();
private final EventQueryService eventQueryService;
public ReplayEventsAnnotationBeanPostProcessor(EventQueryService eventQueryService) {
this.eventQueryService = eventQueryService;
}
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
return bean;
}
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
if (hasReplayEventsAnnotation(bean)) {
findBeforeAndAfterReplayAnnotations(bean);
eventHandlerInvokers.add(new AnnotationEventHandlerInvoker(bean));
}
return bean;
}
private boolean hasReplayEventsAnnotation(Object bean) {
return ReflectionUtils.findAnnotation(bean.getClass(), ReplayEvents.class) != null;
}
private void findBeforeAndAfterReplayAnnotations(Object bean) {
for (Method method : ReflectionUtils.methodsOf(bean.getClass())) {
if (method.isAnnotationPresent(BeforeReplay.class)) {
beforeReplayInvokers.add(new ZeroArgumentsMethodInvoker(bean, method));
} else if (method.isAnnotationPresent(AfterReplay.class)) {
afterReplayInvokers.add(new ZeroArgumentsMethodInvoker(bean, method));
}
}
}
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!initialized.get()) {
initialize();
}
}
private void initialize() {
if (initialized.compareAndSet(false, true)) {
StopWatch sw = new StopWatch();
sw.start();
beforeReplay();
replay();
afterReplay();
LOG.info("Done replaying of events in {}", sw.prettyPrint());
}
}
private void beforeReplay() {
LOG.info("Executing \"before replay\" operations...");
executeMethodInvokers(beforeReplayInvokers);
}
private void replay() {
DomainEventStream eventStream = eventQueryService.getAllEvents();
LOG.info("Replaying of events started");
while (eventStream.hasNext()) {
DomainEventMessage event = eventStream.next();
for (AnnotationEventHandlerInvoker invoker : eventHandlerInvokers) {
invoker.invokeEventHandlerMethod(event);
}
}
}
private void afterReplay() {
LOG.info("Executing \"after replay\" operations...");
executeMethodInvokers(afterReplayInvokers);
}
private void executeMethodInvokers(List<ZeroArgumentsMethodInvoker> invokers) {
for (ZeroArgumentsMethodInvoker invoker : invokers) {
invoker.invoke();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment