Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
The SynchronizedRedisBlock can be used to create a self-isolating block of Redis transactions with a locking mechanism based on the key name provided. Implement by simply instantiating this class and placing all relevant Redis transactions when implementing #perform().
/*
* SynchronizedRedisBlock v1.0
* v1.0
* June 18, 2014
*
* Copyright 2014, Jeriel Jan del Prado
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.jerieljan.app.redis;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import java.util.concurrent.TimeUnit;
/**
* The SynchronizedRedisBlock can be used to create a self-isolating block of transactions with a
* locking mechanism based on the key name provided.
* <p/>
* It works by assigning a LOCK to Redis, performing the code implemented in {@link #perform()} and
* releasing the LOCK once it is done. When other SynchronizedRedisBlocks attempt to perform any
* transactions that have been locked by a different SynchronizedRedisBlock, it will be forced to
* wait until it is released.
* <p/>
* Please note that the LOCK itself has a defined time-to-live (TTL) to avoid deadlocks. Any
* transaction performed with {@link #getRedisTemplate()} will reset the TTL.
*
* @param <T> the expected output type of the Redis transaction.
*
* @author jerieljan
*/
public abstract class SynchronizedRedisBlock<T> {
public static final String LOCK_PREFIX = "LOCK:";
public static final int TIMEOUT_INTERVAL = 2;
public static final TimeUnit TIMEOUT_UNIT = TimeUnit.MINUTES;
public static final int THREAD_SLEEP_INTERVAL = 3500;
private static final Logger LOGGER = LoggerFactory.getLogger(SynchronizedRedisBlock.class);
private static final Object LOCK = new Object();
private final RedisTemplate redisTemplate;
private final String keyName;
/**
* Creates a new SynchronizedRedisBlock. Implement its {@link #perform()} function that will be considered atomic
* and locked in runtime. You should then call {@link #start()} to run the SynchronizedRedisBlock.
*
* When implementing {@link #perform()}, use calls to {@link #getRedisTemplate()} to perform Redis commands.
*
* @param redisTemplate the Redis template that will be in use to perform Redis commands.
* @param keyName the name of the key to be locked.
*/
protected SynchronizedRedisBlock(RedisTemplate redisTemplate, String keyName) {
this.redisTemplate = redisTemplate;
this.keyName = keyName;
}
/**
* Starts the synchronized Redis block.
*/
public T start() {
try {
await();
return perform();
} catch (InterruptedException ex) {
LOGGER.warn("Redis operation aborted!", ex);
} finally {
release();
}
return null;
}
/**
* Retrieves the redis template used in this context, and refreshes the LOCK.
*
* @return
*/
public RedisTemplate getRedisTemplate() {
refreshLock();
return redisTemplate;
}
/**
* Performs the provided transactions within the scope of the synchronized Redis block.
*
* Make sure to use {@link #getRedisTemplate()} when performing the transactions themselves, so that the TTL will be refreshed.
* Using a final-based RedisTemplate or external RedisTemplate will not refresh this TTL, and will break the isolated nature of this class.
* @return the expected output of the transaction.
*
* Do NOT call perform() directly. Call {@link #start()} instead!
*/
abstract T perform();
/**
* Convenience method to block the thread while the LOCK has not been retrieved.
*
* @return true if the LOCK has been retrieved.
*
* @throws InterruptedException if the LOCK cannot be retrieved, or the process of the
* application was forcibly terminated.
*/
private boolean await() throws InterruptedException {
while (!lock()) {
Thread.sleep(THREAD_SLEEP_INTERVAL);
LOGGER.debug("Lock was currently in use for " + keyName);
}
return true;
}
/**
* Releases the LOCK for the given key.
*
* @return true when the LOCK has been released.
*/
private boolean release() {
synchronized (LOCK) {
String lockName = LOCK_PREFIX + keyName;
redisTemplate.delete(lockName);
LOGGER.debug("Lock released for " + keyName);
return redisTemplate.hasKey(lockName);
}
}
/**
* Retrieves the LOCK for the specified key.
*
* @return true if the key is not in use, false if it isn't.
*/
private boolean lock() {
synchronized (LOCK) {
String lockName = LOCK_PREFIX + keyName;
boolean lockStatus = redisTemplate.opsForValue().setIfAbsent(lockName, "-1");
if (lockStatus) {
LOGGER.debug("Lock acquired for " + keyName);
refreshLock();
}
return lockStatus;
}
}
/**
* Refreshes the TTL to the LOCK of a key.
*
* @return true if the refresh was successful.
*/
private boolean refreshLock() {
synchronized (LOCK) {
try {
String lockName = LOCK_PREFIX + keyName;
redisTemplate.expire(lockName, TIMEOUT_INTERVAL, TIMEOUT_UNIT);
} catch (Exception e) {
LOGGER.warn("TTL for " + keyName + " was not refreshed due to an exception.", e);
return false;
}
}
return true;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment