Skip to content

Instantly share code, notes, and snippets.

@Dgiffone
Created September 18, 2011 22:55
Show Gist options
  • Save Dgiffone/1225688 to your computer and use it in GitHub Desktop.
Save Dgiffone/1225688 to your computer and use it in GitHub Desktop.
package a.b.c.common.service.command;
import javax.annotation.Resource;
import org.axonframework.commandhandling.CommandBus;
import org.axonframework.commandhandling.CommandCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.Message;
import org.springframework.integration.MessageRejectedException;
import org.springframework.integration.core.MessageHandler;
import org.springframework.integration.core.SubscribableChannel;
import org.springframework.integration.message.GenericMessage;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import a.b.c.common.service.Utils;
@Transactional
@Service("common.CommandService")
public class SpringIntegrationCommandBus implements CommandService, MessageHandler, InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(SpringIntegrationCommandBus.class);
@Resource(name="commandChannel")
private SubscribableChannel channel;
@Autowired
private CommandBus commandBus;
@Autowired
private DispatchFactory factory;
@Override
public void afterPropertiesSet() throws Exception {
channel.subscribe(this);
}
@Override
public void dispatch(Command command) {
logger.info("In memory dispatching command {}-{}", command.getClass().getSimpleName(), command.getCommandId());
commandBus.dispatch(command);
logger.debug("dispatched");
}
@Override
public <R> void dispatch(Command command, SyncCommandCallback<R> callback) {
CommandCallback<R> commandCallback = new CallbackWrapper<R>(callback);
logger.info("In memory dispatching command {}-{} with callback {}", new Object[] {command.getClass().getSimpleName(), command.getCommandId(), callback.getClass().getSimpleName()});
commandBus.dispatch(command, commandCallback);
logger.debug("dispatched");
}
@Override
public void enqueue(Command command) {
logger.info("Enqueuing command {}-{}", command.getClass().getSimpleName(), command.getCommandId());
Dispatch dispatchInfo = factory.makeDispatch();
command.attachDispatch(dispatchInfo);
channel.send(new GenericMessage<Command>(command));
logger.debug("enqueued");
}
@Override
public void handleMessage(Message<?> message){
if (!(message.getPayload() instanceof Command)) {
logger.warn("Message of wrong type received: " + message.getPayload().getClass().getName());
throw new MessageRejectedException(message, String.format("The payload of incoming messages must be of type %s", Command.class.getName()));
}
Command payload = (Command) message.getPayload();
setDelivered(payload);
logger.debug("Dequeued command {}-{}", payload.getClass().getSimpleName(), payload.getCommandId());
dispatch(payload);
}
private void setDelivered(Command payload) {
EnvelopedDispatch dispatchInfos = (EnvelopedDispatch) payload.getDispatchInfos();
dispatchInfos.receivedBy(Utils.getHostname());
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment