Last active
August 9, 2019 08:57
-
-
Save hengyunabc/cc57478bfcb4cd0553c2 to your computer and use it in GitHub Desktop.
Memcached自动刷新快要失效数据的封装,http://blog.csdn.net/hengyunabc/article/details/20735701
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface DataLoader { | |
public <T> T load(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.Date; | |
import java.util.concurrent.TimeUnit; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
/** | |
* 这个类用于实现memcached key快要失效时,主动重新加载新数据的功能。 | |
* | |
* <pre> | |
* 原理是: | |
* 在set 到Memcached的value中,加上一个时间,(time, value), | |
* 其中这个time是now + expire,即记录的是数据的失效时间,并不是当前时间。 | |
* | |
* 当get到数据时,检查时间是否快要超时: (now - time) < (expire - expireOffsetSeconds) * 1000, | |
* 如果是,则后台启动一个新的线程: | |
* 尝试 add __load_{key}, | |
* 如果成功,则去加载新的数据,并set到memcached中。 | |
* | |
* 默认expireOffsetSeconds是5秒,即memcached的数据还有5秒就要失效时,更新数据到memcached中。 | |
* 使用例子: | |
* | |
* DataLoader dataLoader = new DataLoader() { | |
* public <T> T load() { | |
* try { | |
* TimeUnit.SECONDS.sleep(1); | |
* } catch (InterruptedException e) { | |
* } | |
* return (T) "xxxx"; | |
* } | |
* }; | |
* | |
* public String tryGet() throws InterruptedException { | |
* final String testKey = "testKey"; | |
* String value = RefreshCacheManager.tryGet(client, testKey, 20, dataLoader); | |
* //当返回null时,实际上已经有一个后台线程在加载数据,可以直接返回null给调用者,或者sleep 1秒钟之后再尝试。 | |
* if(value == null) { | |
* TimeUnit.SECONDS.sleep(1); | |
* return RefreshCacheManager.tryGet(client, testKey, 20, dataLoader); | |
* } | |
* return value; | |
* } | |
* | |
* public String autoTetryGet() { | |
* final String testKey = "testKey"; | |
* //如果返回null,会自动重试4次,每次休眠500ms,如果loader能正常工作,重试的概率极小 | |
* String value = RefreshCacheManager.autoRetryGet(client, testKey, 20, dataLoader); | |
* return value; | |
* } | |
* | |
* 如果要使用不同的参数设置,请用RefreshConfigBuilder构造一个RefreshConfig,在调用时传递。 | |
* RefreshConfig refreshConfig = RefreshConfigBuilder.newBuilder() | |
* .prefix("__lock_") | |
* .retryTimes(20) | |
* .retryIntervalsMiliSeconds(100).build(); | |
* RefreshCacheManager.tryGet(client, refreshConfig, testKey, 20, dataLoader); | |
* </pre> | |
* | |
* @author hengyunabc | |
* | |
*/ | |
public class RefreshCacheManager { | |
static Logger logger = LoggerFactory.getLogger(RefreshCacheManager.class); | |
static RefreshConfig staticRefreshConfig = new RefreshConfig(); | |
static public <T> T tryGet(MemcachedClient memcachedClient, final String key, final int expire, final DataLoader dataLoader) { | |
return tryGet(memcachedClient, staticRefreshConfig, key, expire, dataLoader); | |
} | |
static public <T> T tryGet(final MemcachedClient memcachedClient, final RefreshConfig refreshConfig, final String key, final int expire, final DataLoader dataLoader) { | |
TimeValue<T> timeValue = memcachedClient.get(key); | |
final String __load_key = refreshConfig.getPrefix() + key; | |
/** | |
* 如果value != null,这时已经拿到数据了,但是还要检查数据是否将要超时。 | |
* 如果是将要超时,则要后台启动一个线程去重新加载数据,并刷新到memcached里。 | |
* 如果不是将要超时,则直接返回数据。 | |
*/ | |
if (timeValue != null) { | |
Date now = new Date(); | |
//判断是否将要超时 | |
if((timeValue.getDate().getTime() - now.getTime()) < refreshConfig.getExpireOffsetSeconds() * 1000) { | |
refreshConfig.getExecutor().execute(new Runnable() { | |
@Override | |
public void run() { | |
boolean add = memcachedClient.add(__load_key, refreshConfig.getLoadKeyExprieSeconds(), 0); | |
/** | |
* 如果add __load_{key}成功,说明这个后台线程要去加载数据,并刷新到memcached中。 | |
* 如果add __load_{key}不成功,则说明有其它的线程已经去加载数据里,这个线程直接返回即可。 | |
*/ | |
if (add) { | |
logger.info("value will expire, try to load new data, success add, key:" + __load_key); | |
T data = dataLoader.load(); | |
memcachedClient.set(key, expire, TimeValue.build(data, expire)); | |
logger.info("DataLoader load data success. key:" + key); | |
//是否删掉__load_这个key比较好,防止某些极端情况? | |
memcachedClient.delete(__load_key); | |
} | |
} | |
}); | |
} | |
return timeValue.getValue(); | |
}else { | |
/** | |
* 当 value == null时,说明有可能是第一次加载,则先尝试add __load_{key},如果成功, | |
* 则调用dataLoader.load()去加载数据,并set到memcached里,否则返回null。 | |
*/ | |
boolean add = memcachedClient.add(__load_key, refreshConfig.getLoadKeyExprieSeconds(), 0); | |
if(add) { | |
T data = null; | |
try { | |
data = dataLoader.load(); | |
memcachedClient.set(key, expire, TimeValue.build(data, expire)); | |
//TODO,是否要删掉 __load_{key}? | |
} catch (Throwable t) { | |
logger.error("try to load data error!", t); | |
}finally { | |
if (data != null) { | |
return data; | |
} | |
} | |
} | |
} | |
return null; | |
} | |
static public <T> T autoRetryGet(MemcachedClient memcachedClient, final String key, final int expire, final DataLoader dataLoader) { | |
return autoRetryGet(memcachedClient, staticRefreshConfig, key, expire, dataLoader); | |
} | |
static public <T> T autoRetryGet(MemcachedClient memcachedClient, RefreshConfig refreshConfig, final String key, final int expire, final DataLoader dataLoader) { | |
int retryTimes = refreshConfig.getRetryTimes(); | |
while(retryTimes > 0) { | |
T t = tryGet(memcachedClient, refreshConfig, key, expire, dataLoader); | |
if (t != null) { | |
return t; | |
} | |
retryTimes--; | |
try { | |
TimeUnit.MILLISECONDS.sleep(refreshConfig.getRetryIntervalsMiliSeconds()); | |
} catch (InterruptedException e) { | |
logger.error("sleep error", e); | |
} | |
} | |
return null; | |
} | |
/** | |
* 用于手动删除 __load_{key},通常不必调用 | |
*/ | |
static public void deleteLoadKey(MemcachedClient memcachedClient, String key) { | |
deleteLoadKey(memcachedClient, staticRefreshConfig, key); | |
} | |
/** | |
* 用于手动删除 __load_{key},通常不必调用 | |
*/ | |
static public void deleteLoadKey(MemcachedClient memcachedClient, RefreshConfig refreshConfig, String key) { | |
memcachedClient.delete(refreshConfig.getPrefix() + key); | |
} | |
/** | |
* 手动set进{time, value}的数据到memcached里。 | |
* 在手动调用DataLoader.load()之后,可以调用这个函数把数据set到memcached中。 | |
*/ | |
static public void set(MemcachedClient memcachedClient, String key, int expire, Object value){ | |
memcachedClient.set(key, expire, TimeValue.build(value, expire)); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.Executor; | |
import java.util.concurrent.Executors; | |
public class RefreshConfig { | |
static Executor staticExecutor = Executors.newCachedThreadPool(); | |
Executor executor = staticExecutor; | |
/** | |
* 用来防止并发load数据的key的前缀 | |
*/ | |
String prefix = "__load_"; | |
/** | |
* __load_{key},这个key的超时时间 | |
*/ | |
int loadKeyExprieSeconds = 2; | |
/** | |
* 用来判断数据是否快要超时的时间间隔。 比如从memcached中取回的数据的时间是2014-6-29:12:50:30, | |
* 而当前系统时间是:2014-6-29:12:50:34, 则2014-6-29:12:50:36 - 2014-6-29:12:50:30 = 4 | |
* < 5, 说明数据将要超时,则会后台调用DataLoader.load()去加载数据 | |
*/ | |
int expireOffsetSeconds = 5; | |
/** | |
* autoRetryGet 函数的默认重试次数 | |
*/ | |
int retryTimes = 4; | |
/** | |
* autoRetryGet 函数的默认重试时间间隔500毫秒 | |
*/ | |
int retryIntervalsMiliSeconds = 500; | |
public Executor getExecutor() { | |
return executor; | |
} | |
public void setExecutor(Executor executor) { | |
this.executor = executor; | |
} | |
public String getPrefix() { | |
return prefix; | |
} | |
public void setPrefix(String prefix) { | |
this.prefix = prefix; | |
} | |
public int getLoadKeyExprieSeconds() { | |
return loadKeyExprieSeconds; | |
} | |
public void setLoadKeyExprieSeconds(int loadKeyExprieSeconds) { | |
this.loadKeyExprieSeconds = loadKeyExprieSeconds; | |
} | |
public int getExpireOffsetSeconds() { | |
return expireOffsetSeconds; | |
} | |
public void setExpireOffsetSeconds(int expireOffsetSeconds) { | |
this.expireOffsetSeconds = expireOffsetSeconds; | |
} | |
public int getRetryTimes() { | |
return retryTimes; | |
} | |
public void setRetryTimes(int retryTimes) { | |
this.retryTimes = retryTimes; | |
} | |
public int getRetryIntervalsMiliSeconds() { | |
return retryIntervalsMiliSeconds; | |
} | |
public void setRetryIntervalsMiliSeconds(int retryIntervalsMiliSeconds) { | |
this.retryIntervalsMiliSeconds = retryIntervalsMiliSeconds; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* 参考RefreshConfig类的注释 | |
* @author hengyunabc | |
* | |
*/ | |
public class RefreshConfigBuilder { | |
RefreshConfig refreshConfig = new RefreshConfig(); | |
public static RefreshConfigBuilder newBuilder() { | |
return new RefreshConfigBuilder(); | |
} | |
public RefreshConfig build() { | |
return refreshConfig; | |
} | |
public RefreshConfigBuilder executor(Executor executor) { | |
refreshConfig.setExecutor(executor); | |
return this; | |
} | |
public RefreshConfigBuilder prefix(String prefix) { | |
refreshConfig.setPrefix(prefix); | |
return this; | |
} | |
public RefreshConfigBuilder loadKeyExprieSeconds(int loadKeyExprieSeconds) { | |
refreshConfig.setLoadKeyExprieSeconds(loadKeyExprieSeconds); | |
return this; | |
} | |
public RefreshConfigBuilder expireOffsetSeconds(int expireOffsetSeconds) { | |
refreshConfig.setExpireOffsetSeconds(expireOffsetSeconds); | |
return this; | |
} | |
public RefreshConfigBuilder retryTimes(int retryTimes) { | |
refreshConfig.setRetryTimes(retryTimes); | |
return this; | |
} | |
public RefreshConfigBuilder retryIntervalsMiliSeconds(int retryIntervalsMiliSeconds) { | |
refreshConfig.setRetryIntervalsMiliSeconds(retryIntervalsMiliSeconds); | |
return this; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class TimeValue<T> implements Serializable{ | |
private static final long serialVersionUID = 8149019235411612667L; | |
/** | |
* 数据在memcached上过期的时间,并非当前时间 | |
*/ | |
Date date; | |
T value; | |
@SuppressWarnings({ "unchecked", "rawtypes" }) | |
static TimeValue build(Object value, int expireSeconds) { | |
return new TimeValue(new Date(System.currentTimeMillis() + expireSeconds * 1000), value); | |
} | |
public TimeValue(Date date, T value) { | |
this.date = date; | |
this.value = value; | |
} | |
public Date getDate() { | |
return date; | |
} | |
public void setDate(Date date) { | |
this.date = date; | |
} | |
public T getValue() { | |
return value; | |
} | |
public void setValue(T value) { | |
this.value = value; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment