Created
February 3, 2014 13:23
-
-
Save artembilan/8783730 to your computer and use it in GitHub Desktop.
HSQL Priority Message Store
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2014 the original author or authors. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.springframework.integration.jdbc.store.channel; | |
/** | |
* @author Artem Bilan | |
* @since 4.0 | |
*/ | |
public class HsqlChannelPriorityMessageStoreQueryProvider extends HsqlChannelMessageStoreQueryProvider { | |
@Override | |
public String getPollFromGroupExcludeIdsQuery() { | |
return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + | |
"where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + | |
"and %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID not in (:message_ids) order by MESSAGE_PRIORITY DESC, MESSAGE_SEQ ASC LIMIT 1"; | |
} | |
@Override | |
public String getPollFromGroupQuery() { | |
return "SELECT %PREFIX%CHANNEL_MESSAGE.MESSAGE_ID, %PREFIX%CHANNEL_MESSAGE.MESSAGE_BYTES from %PREFIX%CHANNEL_MESSAGE " + | |
"where %PREFIX%CHANNEL_MESSAGE.GROUP_KEY = :group_key and %PREFIX%CHANNEL_MESSAGE.REGION = :region " + | |
"order by MESSAGE_PRIORITY DESC, MESSAGE_SEQ ASC LIMIT 1"; | |
} | |
@Override | |
public String getCreateMessageQuery() { | |
return "INSERT into %PREFIX%CHANNEL_MESSAGE(MESSAGE_ID, GROUP_KEY, REGION, CREATED_DATE, MESSAGE_SEQ, MESSAGE_PRIORITY, MESSAGE_BYTES)" | |
+ " values (?, ?, ?, ?, ?, ?, ?)"; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2014 the original author or authors. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.springframework.integration.jdbc.store; | |
import java.sql.PreparedStatement; | |
import java.sql.SQLException; | |
import java.util.Map; | |
import java.util.concurrent.atomic.AtomicLong; | |
import javax.sql.DataSource; | |
import org.springframework.beans.DirectFieldAccessor; | |
import org.springframework.core.serializer.Serializer; | |
import org.springframework.core.serializer.support.SerializingConverter; | |
import org.springframework.integration.jdbc.store.channel.ChannelMessageStoreQueryProvider; | |
import org.springframework.integration.store.MessageGroup; | |
import org.springframework.integration.support.MessageBuilder; | |
import org.springframework.integration.util.UUIDConverter; | |
import org.springframework.jdbc.core.JdbcTemplate; | |
import org.springframework.jdbc.core.PreparedStatementSetter; | |
import org.springframework.jdbc.support.lob.DefaultLobHandler; | |
import org.springframework.jdbc.support.lob.LobHandler; | |
import org.springframework.messaging.Message; | |
import org.springframework.messaging.MessageHeaders; | |
/** | |
* @author Artem Bilan | |
* @since 4.0 | |
*/ | |
public class JdbcChannelPriorityMessageStore extends JdbcChannelMessageStore { | |
private final AtomicLong messageSequence = new AtomicLong(); | |
private volatile JdbcTemplate jdbcTemplate; | |
private volatile SerializingConverter serializer = new SerializingConverter(); | |
private ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider; | |
private volatile String region = DEFAULT_REGION; | |
private volatile LobHandler lobHandler = new DefaultLobHandler(); | |
public JdbcChannelPriorityMessageStore(DataSource dataSource) { | |
super(dataSource); | |
jdbcTemplate = new JdbcTemplate(dataSource); | |
this.jdbcTemplate.setFetchSize(1); | |
this.jdbcTemplate.setMaxRows(1); | |
this.jdbcTemplate.afterPropertiesSet(); | |
} | |
@SuppressWarnings("unchecked") | |
public void setSerializer(Serializer<? super Message<?>> serializer) { | |
super.setSerializer(serializer); | |
this.serializer = new SerializingConverter((Serializer<Object>) serializer); | |
} | |
public void setChannelMessageStoreQueryProvider(ChannelMessageStoreQueryProvider channelMessageStoreQueryProvider) { | |
super.setChannelMessageStoreQueryProvider(channelMessageStoreQueryProvider); | |
this.channelMessageStoreQueryProvider = channelMessageStoreQueryProvider; | |
} | |
@Override | |
@SuppressWarnings({ "rawtypes", "unchecked" }) | |
public MessageGroup addMessageToGroup(Object groupId, Message<?> message) { | |
final String groupKey = getKey(groupId); | |
final long createdDate = System.currentTimeMillis(); | |
final Message<?> result = MessageBuilder.fromMessage(message).setHeader(SAVED_KEY, Boolean.TRUE) | |
.setHeader(CREATED_DATE_KEY, createdDate).build(); | |
final Map innerMap = (Map) new DirectFieldAccessor(result.getHeaders()).getPropertyValue("headers"); | |
// using reflection to set ID since it is immutable through MessageHeaders | |
innerMap.put(MessageHeaders.ID, message.getHeaders().get(MessageHeaders.ID)); | |
final String messageId = getKey(result.getHeaders().getId()); | |
final byte[] messageBytes = serializer.convert(result); | |
final Integer priority = (Integer) result.getHeaders().get("priority"); | |
jdbcTemplate.update(getQuery(channelMessageStoreQueryProvider.getCreateMessageQuery()), new PreparedStatementSetter() { | |
@Override | |
public void setValues(PreparedStatement ps) throws SQLException { | |
ps.setString(1, messageId); | |
ps.setString(2, groupKey); | |
ps.setString(3, region); | |
ps.setLong(4, createdDate); | |
ps.setLong(5, messageSequence.incrementAndGet()); | |
ps.setLong(6, priority != null ? priority : 0); | |
lobHandler.getLobCreator().setBlobAsBytes(ps, 7, messageBytes); | |
} | |
}); | |
return getMessageGroup(groupId); | |
} | |
private String getKey(Object input) { | |
return input == null ? null : UUIDConverter.getUUID(input).toString(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE TABLE INT_CHANNEL_MESSAGE ( | |
MESSAGE_ID CHAR(36) NOT NULL, | |
GROUP_KEY CHAR(36) NOT NULL, | |
CREATED_DATE BIGINT NOT NULL, | |
MESSAGE_BYTES LONGVARBINARY, | |
MESSAGE_SEQ BIGINT, | |
MESSAGE_PRIORITY BIGINT, | |
REGION VARCHAR(100) NOT NULL, | |
constraint INT_CHANNEL_MESSAGE_PK primary key (GROUP_KEY, MESSAGE_ID, REGION) | |
); | |
CREATE INDEX INT_CHANNEL_MSG_DATE_IDX ON INT_CHANNEL_MESSAGE (CREATED_DATE); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment