Skip to content

Instantly share code, notes, and snippets.

@artembilan
Created October 12, 2015 22:09
Show Gist options
  • Save artembilan/a7adca1cdb9c12a5d3a8 to your computer and use it in GitHub Desktop.
Save artembilan/a7adca1cdb9c12a5d3a8 to your computer and use it in GitHub Desktop.
/*
* Copyright 2015 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package org.springframework.integration.store;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.springframework.integration.util.UpperBound;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;
/**
* @author Artem Bilan
* @since 4.2
*/
public class SingleGroupMessageStore extends AbstractMessageGroupStore implements MessageStore {
private final ConcurrentMap<UUID, Message<?>> idToMessage = new ConcurrentHashMap<UUID, Message<?>>();
private final UpperBound upperBound;
private final Object groupId;
private final SimpleMessageGroup messageGroup;
private final long upperBoundTimeout;
public SingleGroupMessageStore(Object groupId) {
this(groupId, 0);
}
public SingleGroupMessageStore(Object groupId, int capacity) {
this(groupId, capacity, 0);
}
public SingleGroupMessageStore(Object groupId, int capacity, long upperBoundTimeout) {
Assert.notNull(groupId, "'groupId' must not be null");
this.groupId = groupId;
this.upperBound = new UpperBound(capacity);
this.messageGroup = new SimpleMessageGroup(idToMessage.values(), this.groupId);
this.upperBoundTimeout = upperBoundTimeout;
}
@Override
public MessageGroup removeMessageFromGroup(Object groupId, Message<?> messageToRemove) {
assertGroupId(groupId);
if (this.idToMessage.remove(messageToRemove.getHeaders().getId()) != null) {
this.upperBound.release();
this.messageGroup.setLastModified(System.currentTimeMillis());
}
return this.messageGroup;
}
@Override
public void removeMessagesFromGroup(Object groupId, Collection<Message<?>> messages) {
assertGroupId(groupId);
boolean modified = false;
for (Message<?> message : messages) {
if (this.idToMessage.remove(message.getHeaders().getId()) != null) {
this.upperBound.release();
modified = true;
}
}
if (modified) {
this.messageGroup.setLastModified(System.currentTimeMillis());
}
}
@Override
public void setLastReleasedSequenceNumberForGroup(Object groupId, int sequenceNumber) {
assertGroupId(groupId);
this.messageGroup.setLastReleasedMessageSequenceNumber(sequenceNumber);
}
@Override
public Iterator<MessageGroup> iterator() {
return Collections.<MessageGroup>singletonList(this.messageGroup).iterator();
}
@Override
public void completeGroup(Object groupId) {
assertGroupId(groupId);
this.messageGroup.complete();
}
@Override
public int messageGroupSize(Object groupId) {
assertGroupId(groupId);
return this.messageGroup.size();
}
@Override
public MessageGroup getMessageGroup(Object groupId) {
assertGroupId(groupId);
return this.messageGroup;
}
@Override
public MessageGroup addMessageToGroup(Object groupId, Message<?> message) {
assertGroupId(groupId);
if (!this.upperBound.tryAcquire(this.upperBoundTimeout)) {
throw new MessagingException(this.getClass().getSimpleName()
+ " was out of capacity at, try constructing it with a larger capacity.");
}
this.idToMessage.put(message.getHeaders().getId(), message);
return this.messageGroup;
}
@Override
public Message<?> pollMessageFromGroup(Object groupId) {
assertGroupId(groupId);
Message<?> message = null;
if (!CollectionUtils.isEmpty(this.idToMessage)) {
message = this.idToMessage.values().iterator().next();
if (message != null) {
removeMessagesFromGroup(groupId, message);
}
}
return message;
}
@Override
public void removeMessageGroup(Object groupId) {
assertGroupId(groupId);
this.idToMessage.clear();
}
@Override
public Message<?> getMessage(UUID id) {
return this.idToMessage.get(id);
}
@Override
public <T> Message<T> addMessage(Message<T> message) {
if (!upperBound.tryAcquire(this.upperBoundTimeout)) {
throw new MessagingException(this.getClass().getSimpleName()
+ " was out of capacity at, try constructing it with a larger capacity.");
}
this.idToMessage.put(message.getHeaders().getId(), message);
return message;
}
@Override
public Message<?> removeMessage(UUID id) {
Message<?> message = this.idToMessage.remove(id);
if (message != null) {
this.upperBound.release();
}
return message;
}
@Override
public long getMessageCount() {
return this.idToMessage.size();
}
@Override
public String toString() {
return "SingleGroupMessageStore{" +
"messageGroup=" + messageGroup +
'}';
}
private void assertGroupId(Object groupId) {
Assert.state(this.groupId.equals(groupId),
"The 'groupId' [" + groupId + "] isn't equal to the pre-configured key [" + this.groupId + "] " +
"to perform operation on this " + this);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment