Skip to content

Instantly share code, notes, and snippets.

@EvgeniGordeev
Last active January 13, 2021 01:11
Show Gist options
  • Save EvgeniGordeev/5e2c0827aa73614f26fcac598f8237c6 to your computer and use it in GitHub Desktop.
Save EvgeniGordeev/5e2c0827aa73614f26fcac598f8237c6 to your computer and use it in GitHub Desktop.
ActiveMQ - copy pending messages from one server to another
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.IdGenerator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.IntStream;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
/**
* Migrate from old activemq to amazon mq.
*
* @author evgeni.gordeev
*/
public class Migrate {
private static final Logger LOG = LoggerFactory.getLogger(Migrate.class);
public static void main(String[] args) throws JMSException {
ActiveMQConnection source = (ActiveMQConnection) getSourceConnFactory().createConnection();
source.start();
Session sourceSession = source.createSession(false, Session.AUTO_ACKNOWLEDGE);
ActiveMQConnectionFactory targetConnFactory = getTargetConnFactory();
ActiveMQConnection target = (ActiveMQConnection) targetConnFactory.createConnection();
target.start();
Session targetSes = target.createSession(false, Session.AUTO_ACKNOWLEDGE);
Set<ActiveMQQueue> queues = source.getDestinationSource().getQueues();
int qCounter = 1;
for (ActiveMQQueue queue : queues) {
if ("ActiveMQ.DLQ".equals(queue.getQueueName())) {
continue;
}
LOG.info("Processing queue #{}. '{}'", qCounter++, queue.getQueueName());
QueueBrowser sourceBrowser = sourceSession.createBrowser(queue);
List<ActiveMQMessage> msgs = Collections.list(sourceBrowser.getEnumeration());
targetSes.createQueue(queue.getQueueName());
if (msgs.isEmpty()) {
LOG.info("No messages in queue '{}'", queue);
} else {
LOG.info("{} messages in queue '{}'", msgs.size(), queue);
int mCounter = 1;
for (ActiveMQMessage message : msgs) {
LOG.info("Copying message #{}. '{}'", mCounter++, message.getJMSMessageID());
ActiveMQMessage destMsg = (ActiveMQMessage) message.copy();
copy(targetConnFactory, destMsg);
}
}
}
source.stop();
target.stop();
System.exit(0);
}
private static ActiveMQConnectionFactory getSourceConnFactory() {
return getConnFactory("tcp://127.0.0.1:61616", null, null);
}
private static ActiveMQConnectionFactory getTargetConnFactory() {
return getConnFactory("ssl://hash.mq.us-east-1.amazonaws.com:61617", "user", "pass");
}
private static ActiveMQConnectionFactory getConnFactory(String brokerUrl, String username, String password) {
return new ActiveMQConnectionFactory(username, password, brokerUrl);
}
private static void copy(ActiveMQConnectionFactory targetConnFactory, ActiveMQMessage message) {
try {
ActiveMQConnectionFactory current = new CopyConnectionFactory(targetConnFactory, message);
final Connection connection = current.createConnection();
connection.start();
// PRESERVE MessageId - START
ActiveMQSession targetSes = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue targetQueue = targetSes.createQueue(message.getDestination().getPhysicalName());
Method getNextProducerId = targetSes.getClass().getDeclaredMethod("getNextProducerId");
getNextProducerId.setAccessible(true);
IntStream.range(1, (int) message.getProducerId().getValue()).forEachOrdered(n -> {
try {
getNextProducerId.invoke(targetSes);
} catch (IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
}
});
ActiveMQMessageProducer producer = (ActiveMQMessageProducer) targetSes.createProducer(targetQueue);
Method getMessageSequence = producer.getClass().getDeclaredMethod("getMessageSequence");
getMessageSequence.setAccessible(true);
IntStream.range(1, (int) message.getMessageId().getProducerSequenceId()).forEachOrdered(n -> {
try {
getMessageSequence.invoke(producer);
} catch (IllegalAccessException | InvocationTargetException e) {
e.printStackTrace();
}
});
// PRESERVE MessageId - END
producer.send(targetQueue,
message,
message.getJMSDeliveryMode(),
message.getPriority(),
message.getJMSExpiration() - message.getJMSTimestamp());
connection.close();
} catch (NoSuchMethodException | JMSException e) {
e.printStackTrace();
}
}
/**
* To preserve the original MessageId.
*/
static class CopyConnectionFactory extends ActiveMQConnectionFactory {
private final String clientId;
public CopyConnectionFactory(ActiveMQConnectionFactory targetConnFactory, ActiveMQMessage message) {
super(targetConnFactory.getUserName(), targetConnFactory.getPassword(), targetConnFactory.getBrokerURL());
this.clientId = message.getMessageId().getProducerId().getConnectionId();
}
protected synchronized IdGenerator getClientIdGenerator() {
return new IdGenerator() {
public synchronized String generateId() {
return clientId;
}
};
}
protected synchronized IdGenerator getConnectionIdGenerator() {
return new IdGenerator() {
public synchronized String generateId() {
return clientId;
}
};
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment