Created
August 28, 2012 16:22
-
-
Save jgonian/1ab7b9a78365bf666084 to your computer and use it in GitHub Desktop.
AXON-10: Infrastructure for replaying events.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Test | |
public void shouldInvokeReplayAnnotationsInOrder() { | |
class A { | |
@BeforeReplay | |
public void before() { | |
} | |
@AfterReplay | |
public void after() { | |
} | |
} | |
@ReplayEvents | |
class B extends A { | |
@EventHandler | |
public void handle(DomainEventMessage event) { | |
} | |
} | |
B bean = mock(B.class); | |
DomainEventMessage event = mock(DomainEventMessage.class); | |
executePostProcessor(bean, event); | |
InOrder inOrder = inOrder(bean); | |
inOrder.verify(bean).before(); | |
inOrder.verify(bean).handle(same(event)); | |
inOrder.verify(bean).after(); | |
} | |
@Test | |
public void shouldInvokeHandlerInChild() { | |
@ReplayEvents | |
class A { | |
@EventHandler | |
public void handleInParent(DomainEventMessage event) { | |
} | |
@EventHandler | |
public void handleInParent(GenericDomainEventMessage event) { | |
} | |
} | |
@ReplayEvents | |
class B extends A { | |
@EventHandler | |
public void handleInChild(DomainEventMessage event) { | |
} | |
} | |
B bean = mock(B.class); | |
GenericDomainEventMessage event = mock(GenericDomainEventMessage.class); | |
executePostProcessor(bean, event); | |
verify((A) bean, never()).handleInParent(isA(DomainEventMessage.class)); | |
verify((A) bean, never()).handleInParent(isA(GenericDomainEventMessage.class)); | |
verify(bean).handleInChild(same(event)); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.axonframework.eventhandling.annotation; | |
import java.lang.reflect.Method; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import org.axonframework.common.ReflectionUtils; | |
import org.axonframework.domain.DomainEventMessage; | |
import org.axonframework.domain.DomainEventStream; | |
import org.axonframework.eventstore.management.EventQueryService; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.beans.BeansException; | |
import org.springframework.beans.factory.config.BeanPostProcessor; | |
import org.springframework.context.ApplicationListener; | |
import org.springframework.context.event.ContextRefreshedEvent; | |
import org.springframework.util.StopWatch; | |
public class ReplayEventsAnnotationBeanPostProcessor implements BeanPostProcessor, ApplicationListener<ContextRefreshedEvent> { | |
private static final Logger LOG = LoggerFactory.getLogger(ReplayEventsAnnotationBeanPostProcessor.class); | |
private final List<ZeroArgumentsMethodInvoker> beforeReplayInvokers = new ArrayList<ZeroArgumentsMethodInvoker>(); | |
private final List<ZeroArgumentsMethodInvoker> afterReplayInvokers = new ArrayList<ZeroArgumentsMethodInvoker>(); | |
private final List<AnnotationEventHandlerInvoker> eventHandlerInvokers = new ArrayList<AnnotationEventHandlerInvoker>(); | |
private final AtomicBoolean initialized = new AtomicBoolean(); | |
private final EventQueryService eventQueryService; | |
public ReplayEventsAnnotationBeanPostProcessor(EventQueryService eventQueryService) { | |
this.eventQueryService = eventQueryService; | |
} | |
@Override | |
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { | |
return bean; | |
} | |
@Override | |
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException { | |
if (hasReplayEventsAnnotation(bean)) { | |
findBeforeAndAfterReplayAnnotations(bean); | |
eventHandlerInvokers.add(new AnnotationEventHandlerInvoker(bean)); | |
} | |
return bean; | |
} | |
private boolean hasReplayEventsAnnotation(Object bean) { | |
return ReflectionUtils.findAnnotation(bean.getClass(), ReplayEvents.class) != null; | |
} | |
private void findBeforeAndAfterReplayAnnotations(Object bean) { | |
for (Method method : ReflectionUtils.methodsOf(bean.getClass())) { | |
if (method.isAnnotationPresent(BeforeReplay.class)) { | |
beforeReplayInvokers.add(new ZeroArgumentsMethodInvoker(bean, method)); | |
} else if (method.isAnnotationPresent(AfterReplay.class)) { | |
afterReplayInvokers.add(new ZeroArgumentsMethodInvoker(bean, method)); | |
} | |
} | |
} | |
@Override | |
public void onApplicationEvent(ContextRefreshedEvent event) { | |
if (!initialized.get()) { | |
initialize(); | |
} | |
} | |
private void initialize() { | |
if (initialized.compareAndSet(false, true)) { | |
StopWatch sw = new StopWatch(); | |
sw.start(); | |
beforeReplay(); | |
replay(); | |
afterReplay(); | |
LOG.info("Done replaying of events in {}", sw.prettyPrint()); | |
} | |
} | |
private void beforeReplay() { | |
LOG.info("Executing \"before replay\" operations..."); | |
executeMethodInvokers(beforeReplayInvokers); | |
} | |
private void replay() { | |
DomainEventStream eventStream = eventQueryService.getAllEvents(); | |
LOG.info("Replaying of events started"); | |
while (eventStream.hasNext()) { | |
DomainEventMessage event = eventStream.next(); | |
for (AnnotationEventHandlerInvoker invoker : eventHandlerInvokers) { | |
invoker.invokeEventHandlerMethod(event); | |
} | |
} | |
} | |
private void afterReplay() { | |
LOG.info("Executing \"after replay\" operations..."); | |
executeMethodInvokers(afterReplayInvokers); | |
} | |
private void executeMethodInvokers(List<ZeroArgumentsMethodInvoker> invokers) { | |
for (ZeroArgumentsMethodInvoker invoker : invokers) { | |
invoker.invoke(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment