Skip to content

Instantly share code, notes, and snippets.

@garyrussell
Last active December 28, 2015 08:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save garyrussell/7472731 to your computer and use it in GitHub Desktop.
Save garyrussell/7472731 to your computer and use it in GitHub Desktop.
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.springframework.integration.aggregator;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import org.junit.Test;
import org.springframework.integration.Message;
import org.springframework.integration.MessageChannel;
import org.springframework.integration.MessageHeaders;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.store.RollingMessageStore;
import org.springframework.integration.support.MessageBuilder;
/**
* @author Mark Fisher
* @author Marius Bogoevici
* @author Iwein Fuld
* @author Gary Russell
*/
public class AggregatorTests {
@Test
public void testRolling() {
AggregatingMessageHandler aggregator = new AggregatingMessageHandler(new MultiplyingProcessor(), new RollingMessageStore());
aggregator.setExpireGroupsUponCompletion(true);
aggregator.setReleaseStrategy(new ReleaseStrategy() {
@Override
public boolean canRelease(MessageGroup group) {
return group.size() == 3;
}
});
QueueChannel replyChannel = new QueueChannel();
Message<?> message1 = createMessage(3, "ABC", 3, 1, replyChannel, null);
Message<?> message2 = createMessage(5, "ABC", 3, 2, replyChannel, null);
Message<?> message3 = createMessage(7, "ABC", 3, 3, replyChannel, null);
Message<?> message4 = createMessage(9, "ABC", 3, 3, replyChannel, null);
Message<?> message5 = createMessage(11, "ABC", 3, 3, replyChannel, null);
aggregator.handleMessage(message1);
aggregator.handleMessage(message2);
aggregator.handleMessage(message3);
aggregator.handleMessage(message4);
aggregator.handleMessage(message5);
Message<?> reply = replyChannel.receive(10000);
assertNotNull(reply);
assertEquals(105, reply.getPayload());
reply = replyChannel.receive(10000);
assertNotNull(reply);
assertEquals(315, reply.getPayload());
reply = replyChannel.receive(10000);
assertNotNull(reply);
assertEquals(693, reply.getPayload());
}
private static Message<?> createMessage(Object payload, Object correlationId, int sequenceSize, int sequenceNumber,
MessageChannel replyChannel, String predefinedId) {
MessageBuilder<Object> builder = MessageBuilder.withPayload(payload).setCorrelationId(correlationId)
.setSequenceSize(sequenceSize).setSequenceNumber(sequenceNumber).setReplyChannel(replyChannel);
if (predefinedId != null) {
builder.setHeader(MessageHeaders.ID, predefinedId);
}
return builder.build();
}
private class MultiplyingProcessor implements MessageGroupProcessor {
@Override
public Object processMessageGroup(MessageGroup group) {
Integer product = 1;
for (Message<?> message : group.getMessages()) {
product *= (Integer) message.getPayload();
}
return product;
}
}
}
/*
* Copyright 2002-2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package foo;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.Lock;
import org.springframework.integration.Message;
import org.springframework.integration.MessagingException;
import org.springframework.integration.util.DefaultLockRegistry;
import org.springframework.integration.util.LockRegistry;
import org.springframework.integration.util.UpperBound;
import org.springframework.jmx.export.annotation.ManagedAttribute;
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
/**
* Map-based in-memory implementation of {@link MessageStore} and {@link MessageGroupStore}. Enforces a maximum capacity for the
* store.
*
* @author Iwein Fuld
* @author Mark Fisher
* @author Dave Syer
* @author Oleg Zhurakousky
* @author Gary Russell
*
* @since 3.0
*/
@ManagedResource
public class RollingMessageStore extends AbstractMessageGroupStore implements MessageStore, MessageGroupStore {
private volatile LockRegistry lockRegistry;
private final ConcurrentMap<UUID, Message<?>> idToMessage;
private final ConcurrentMap<Object, SimpleMessageGroup> groupIdToMessageGroup;
private final UpperBound individualUpperBound;
private final UpperBound groupUpperBound;
private volatile boolean isUsed;
/**
* Creates a SimpleMessageStore with a maximum size limited by the given capacity, or unlimited size if the given
* capacity is less than 1. The capacities are applied independently to messages stored via
* {@link #addMessage(Message)} and to those stored via {@link #addMessageToGroup(Object, Message)}. In both cases
* the capacity applies to the number of messages that can be stored, and once that limit is reached attempting to
* store another will result in an exception.
*/
public RollingMessageStore(int individualCapacity, int groupCapacity) {
this(individualCapacity, groupCapacity, new DefaultLockRegistry());
}
/**
* See {@link #SimpleMessageStore(int, int)}.
* Also allows the provision of a custom {@link LockRegistry}
* rather than using the default.
*/
public RollingMessageStore(int individualCapacity, int groupCapacity, LockRegistry lockRegistry) {
Assert.notNull(lockRegistry, "The LockRegistry cannot be null");
this.idToMessage = new ConcurrentHashMap<UUID, Message<?>>();
this.groupIdToMessageGroup = new ConcurrentHashMap<Object, SimpleMessageGroup>();
this.individualUpperBound = new UpperBound(individualCapacity);
this.groupUpperBound = new UpperBound(groupCapacity);
this.lockRegistry = lockRegistry;
}
/**
* Creates a SimpleMessageStore with the same capacity for individual and grouped messages.
*/
public RollingMessageStore(int capacity) {
this(capacity, capacity);
}
/**
* Creates a SimpleMessageStore with unlimited capacity
*/
public RollingMessageStore() {
this(0);
}
public void setLockRegistry(LockRegistry lockRegistry) {
Assert.notNull(lockRegistry, "The LockRegistry cannot be null");
Assert.isTrue(!(this.isUsed), "Cannot change the lock registry after the store has been used");
this.lockRegistry = lockRegistry;
}
@Override
@ManagedAttribute
public long getMessageCount() {
return idToMessage.size();
}
@Override
public <T> Message<T> addMessage(Message<T> message) {
this.isUsed = true;
if (!individualUpperBound.tryAcquire(0)) {
throw new MessagingException(this.getClass().getSimpleName()
+ " was out of capacity at, try constructing it with a larger capacity.");
}
this.idToMessage.put(message.getHeaders().getId(), message);
return message;
}
@Override
public Message<?> getMessage(UUID key) {
return (key != null) ? this.idToMessage.get(key) : null;
}
@Override
public Message<?> removeMessage(UUID key) {
if (key != null) {
individualUpperBound.release();
return this.idToMessage.remove(key);
}
else {
return null;
}
}
@Override
public MessageGroup getMessageGroup(Object groupId) {
Assert.notNull(groupId, "'groupId' must not be null");
SimpleMessageGroup group = groupIdToMessageGroup.get(groupId);
if (group == null) {
return new SimpleMessageGroup(groupId);
}
SimpleMessageGroup simpleMessageGroup = new SimpleMessageGroup(group);
simpleMessageGroup.setLastModified(group.getLastModified());
return simpleMessageGroup;
}
@Override
public MessageGroup addMessageToGroup(Object groupId, Message<?> message) {
if (!groupUpperBound.tryAcquire(0)) {
throw new MessagingException(this.getClass().getSimpleName()
+ " was out of capacity at, try constructing it with a larger capacity.");
}
Lock lock = this.lockRegistry.obtain(groupId);
try {
lock.lockInterruptibly();
try {
SimpleMessageGroup group = this.groupIdToMessageGroup.get(groupId);
if (group == null) {
group = new SimpleMessageGroup(groupId);
this.groupIdToMessageGroup.putIfAbsent(groupId, group);
}
group.add(message);
this.groupIdToMessageGroup.get(groupId).setLastModified(System.currentTimeMillis());
return group;
}
finally {
lock.unlock();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MessagingException("Interrupted while obtaining lock", e);
}
}
@Override
public void removeMessageGroup(Object groupId) {
Lock lock = this.lockRegistry.obtain(groupId);
try {
lock.lockInterruptibly();
try {
if (!groupIdToMessageGroup.containsKey(groupId)) {
return;
}
Message<?> message = this.groupIdToMessageGroup.get(groupId).getOne();
if (message != null) {
this.groupUpperBound.release(1);
this.removeMessageFromGroup(groupId, message);
}
}
finally {
lock.unlock();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MessagingException("Interrupted while obtaining lock", e);
}
}
@Override
public MessageGroup removeMessageFromGroup(Object groupId, Message<?> messageToRemove) {
Lock lock = this.lockRegistry.obtain(groupId);
try {
lock.lockInterruptibly();
try {
SimpleMessageGroup group = this.groupIdToMessageGroup.get(groupId);
Assert.notNull(group, "MessageGroup for groupId '" + groupId + "' " +
"can not be located while attempting to remove Message from the MessageGroup");
group.remove(messageToRemove);
group.setLastModified(System.currentTimeMillis());
return group;
}
finally {
lock.unlock();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MessagingException("Interrupted while obtaining lock", e);
}
}
@Override
public Iterator<MessageGroup> iterator() {
return new HashSet<MessageGroup>(groupIdToMessageGroup.values()).iterator();
}
@Override
public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) {
Lock lock = this.lockRegistry.obtain(groupId);
try {
lock.lockInterruptibly();
try {
SimpleMessageGroup group = this.groupIdToMessageGroup.get(groupId);
Assert.notNull(group, "MessageGroup for groupId '" + groupId + "' " +
"can not be located while attempting to set 'lastReleasedSequenceNumber'");
group.setLastReleasedMessageSequenceNumber(sequenceNumber);
group.setLastModified(System.currentTimeMillis());
}
finally {
lock.unlock();
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MessagingException("Interrupted while obtaining lock", e);
}
}
@Override
public void completeGroup(Object groupId) {
}
@Override
public Message<?> pollMessageFromGroup(Object groupId) {
Collection<Message<?>> messageList = this.getMessageGroup(groupId).getMessages();
Message<?> message = null;
if (!CollectionUtils.isEmpty(messageList)){
message = messageList.iterator().next();
if (message != null){
this.removeMessageFromGroup(groupId, message);
}
}
return message;
}
@Override
public int messageGroupSize(Object groupId) {
return this.getMessageGroup(groupId).size();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment