Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save artembilan/8783730 to your computer and use it in GitHub Desktop.
Save artembilan/8783730 to your computer and use it in GitHub Desktop.
HSQL Priority Message Store
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.jdbc.store.channel;
/**
* @author Artem Bilan
* @since 4.0
*/
public class HsqlChannelPriorityMessageStoreQueryProvider extends HsqlChannelMessageStoreQueryProvider {
@Override
public String getPollFromGroupExcludeIdsQuery() {
return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " +
"where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " +
"and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) order by MESSAGE_PRIORITY DESC, MESSAGE_SEQ ASC LIMIT 1";
}
@Override
public String getPollFromGroupQuery() {
return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " +
"where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " +
"order by MESSAGE_PRIORITY DESC, MESSAGE_SEQ ASC LIMIT 1";
}
@Override
public String getCreateMessageQuery() {
return "INSERT into %PREFIX%CHANNEL_MESSAGE(MESSAGE_ID, GROUP_KEY, REGION, CREATED_DATE, MESSAGE_SEQ, MESSAGE_PRIORITY, MESSAGE_BYTES)"
+ " values (?, ?, ?, ?, ?, ?, ?)";
}
}
/*
* Copyright 2014 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.integration.jdbc.store;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.sql.DataSource;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.core.serializer.Serializer;
import org.springframework.core.serializer.support.SerializingConverter;
import org.springframework.integration.jdbc.store.channel.ChannelMessageStoreQueryProvider;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.util.UUIDConverter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.core.PreparedStatementSetter;
import org.springframework.jdbc.support.lob.DefaultLobHandler;
import org.springframework.jdbc.support.lob.LobHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
/**
* @author Artem Bilan
* @since 4.0
*/
public class JdbcChannelPriorityMessageStore extends JdbcChannelMessageStore {
private final AtomicLong messageSequence = new AtomicLong();
private volatile JdbcTemplate jdbcTemplate;
private volatile SerializingConverter serializer = new SerializingConverter();
private ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider;
private volatile String region = DEFAULT_REGION;
private volatile LobHandler lobHandler = new DefaultLobHandler();
public JdbcChannelPriorityMessageStore(DataSource dataSource) {
super(dataSource);
jdbcTemplate = new JdbcTemplate(dataSource);
this.jdbcTemplate.setFetchSize(1);
this.jdbcTemplate.setMaxRows(1);
this.jdbcTemplate.afterPropertiesSet();
}
@SuppressWarnings("unchecked")
public void setSerializer(Serializer<? super Message<?>> serializer) {
super.setSerializer(serializer);
this.serializer = new SerializingConverter((Serializer<Object>) serializer);
}
public void setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider) {
super.setChannelMessageStoreQueryProvider(channelMessageStoreQueryProvider);
this.channelMessageStoreQueryProvider = channelMessageStoreQueryProvider;
}
@Override
@SuppressWarnings({ "rawtypes", "unchecked" })
public MessageGroup addMessageToGroup(Object groupId, Message<?> message) {
final String groupKey = getKey(groupId);
final long createdDate = System.currentTimeMillis();
final Message<?> result = MessageBuilder.fromMessage(message).setHeader(SAVED_KEY, Boolean.TRUE)
.setHeader(CREATED_DATE_KEY, createdDate).build();
final Map innerMap = (Map) new DirectFieldAccessor(result.getHeaders()).getPropertyValue("headers");
// using reflection to set ID since it is immutable through MessageHeaders
innerMap.put(MessageHeaders.ID, message.getHeaders().get(MessageHeaders.ID));
final String messageId = getKey(result.getHeaders().getId());
final byte[] messageBytes = serializer.convert(result);
final Integer priority = (Integer) result.getHeaders().get("priority");
jdbcTemplate.update(getQuery(channelMessageStoreQueryProvider.getCreateMessageQuery()), new PreparedStatementSetter() {
@Override
public void setValues(PreparedStatement ps) throws SQLException {
ps.setString(1, messageId);
ps.setString(2, groupKey);
ps.setString(3, region);
ps.setLong(4, createdDate);
ps.setLong(5, messageSequence.incrementAndGet());
ps.setLong(6, priority != null ? priority : 0);
lobHandler.getLobCreator().setBlobAsBytes(ps, 7, messageBytes);
}
});
return getMessageGroup(groupId);
}
private String getKey(Object input) {
return input == null ? null : UUIDConverter.getUUID(input).toString();
}
}
CREATE TABLE INT_CHANNEL_MESSAGE (
MESSAGE_ID CHAR(36) NOT NULL,
GROUP_KEY CHAR(36) NOT NULL,
CREATED_DATE BIGINT NOT NULL,
MESSAGE_BYTES LONGVARBINARY,
MESSAGE_SEQ BIGINT,
MESSAGE_PRIORITY BIGINT,
REGION VARCHAR(100) NOT NULL,
constraint INT_CHANNEL_MESSAGE_PK primary key (GROUP_KEY, MESSAGE_ID, REGION)
);
CREATE INDEX INT_CHANNEL_MSG_DATE_IDX ON INT_CHANNEL_MESSAGE (CREATED_DATE);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment