Created
July 26, 2012 21:19
-
-
Save amalakar/3184582 to your computer and use it in GitHub Desktop.
thread safe cache client
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one | |
* or more contributor license agreements. See the NOTICE file | |
* distributed with this work for additional information | |
* regarding copyright ownership. The ASF licenses this file | |
* to you under the Apache License, Version 2.0 (the | |
* "License"); you may not use this file except in compliance | |
* with the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.hcatalog.common.cache; | |
import com.google.common.cache.Cache; | |
import com.google.common.cache.CacheBuilder; | |
import com.google.common.cache.RemovalListener; | |
import com.google.common.cache.RemovalNotification; | |
import org.apache.commons.lang.builder.EqualsBuilder; | |
import org.apache.commons.lang.builder.HashCodeBuilder; | |
import org.apache.hadoop.hive.conf.HiveConf; | |
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; | |
import org.apache.hadoop.hive.metastore.api.MetaException; | |
import org.apache.hadoop.hive.shims.ShimLoader; | |
import org.apache.hadoop.security.UserGroupInformation; | |
import org.apache.hcatalog.common.HCatConf; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import javax.security.auth.login.LoginException; | |
import java.io.IOException; | |
import java.util.concurrent.Callable; | |
import java.util.concurrent.ConcurrentMap; | |
import java.util.concurrent.ExecutionException; | |
import java.util.concurrent.TimeUnit; | |
import java.util.concurrent.atomic.AtomicBoolean; | |
import java.util.concurrent.atomic.AtomicInteger; | |
/** | |
* A time expired cache for HiveMetaStoreClient | |
*/ | |
public class HiveClientCache { | |
final private Cache<HiveClientCacheKey, CacheableHiveMetaStoreClient> hiveCache; | |
private static final Logger LOG = LoggerFactory.getLogger(HiveClientCache.class); | |
private final int timetout; | |
// Lazy initialization for singleton | |
static class LazyHolder { | |
static final HiveConf hiveConf = new HiveConf(HiveClientCache.class); | |
static final HiveClientCache INSTANCE = | |
new HiveClientCache(hiveConf.getInt(HCatConf.HCATALOG_HIVE_CLIENT_EXPIRY_TIME, 2*60)); | |
} | |
public static HiveClientCache getInstance() { | |
return LazyHolder.INSTANCE; | |
} | |
/** | |
* The class is protected for the purpose of testing, could have been private otherwise | |
* @param timeout the length of time in seconds after a client is created that it should be automatically removed | |
*/ | |
protected HiveClientCache(final int timeout) { | |
this.timetout = timeout; | |
RemovalListener<HiveClientCacheKey, CacheableHiveMetaStoreClient> removalListener = | |
new RemovalListener<HiveClientCacheKey, CacheableHiveMetaStoreClient>() { | |
public void onRemoval(RemovalNotification<HiveClientCacheKey, CacheableHiveMetaStoreClient> notification) { | |
CacheableHiveMetaStoreClient hiveMetaStoreClient = notification.getValue(); | |
if (hiveMetaStoreClient != null) { | |
hiveMetaStoreClient.setExpiredFromCache(); | |
hiveMetaStoreClient.tearDownIfUnused(); | |
} | |
} | |
}; | |
hiveCache = CacheBuilder.newBuilder() | |
.expireAfterWrite(timeout, TimeUnit.SECONDS) | |
.removalListener(removalListener) | |
.build(); | |
// Add a shutdown hook for cleanup, if there are elements remaining in the cache which were not cleaned up. | |
// This is the best effort approach. Ignore any error while doing so. Notice that most of the clients | |
// would get cleaned up via either the removalListener or the close() call, only the clients created in the | |
// which expired after the last cache call wont | |
Thread cleanupHiveClientShutdownThread = new Thread() { | |
@Override | |
public void run() { | |
LOG.info("Cleaning up hive client cache in ShutDown hook"); | |
try { | |
ConcurrentMap<HiveClientCacheKey, CacheableHiveMetaStoreClient> elements = hiveCache.asMap(); | |
for (HiveClientCacheKey hiveClientCacheKey : elements.keySet()) { | |
elements.get(hiveClientCacheKey).tearDown(); | |
} | |
} catch (Throwable e) { | |
LOG.info("Clean up of hive client cache during shutdown failed. Ignored", e); | |
} | |
} | |
}; | |
Runtime.getRuntime().addShutdownHook(cleanupHiveClientShutdownThread); | |
} | |
public void cleanup() { | |
hiveCache.cleanUp(); | |
} | |
/** | |
* Returns a cached client if exists or else creates one, caches and returns it | |
* @param hiveConf | |
* @return the hive client | |
* @throws MetaException | |
* @throws IOException | |
* @throws LoginException | |
*/ | |
public HiveMetaStoreClient get(final HiveConf hiveConf) throws MetaException, IOException, LoginException { | |
try { | |
final HiveClientCacheKey cacheKey = HiveClientCacheKey.fromHiveConf(hiveConf); | |
CacheableHiveMetaStoreClient hiveMetaStoreClient = hiveCache.get(cacheKey, new Callable<CacheableHiveMetaStoreClient>() { | |
@Override | |
public CacheableHiveMetaStoreClient call() throws MetaException { | |
return new CacheableHiveMetaStoreClient(cacheKey.getHiveConf(), timetout); | |
} | |
}); | |
hiveMetaStoreClient.acquire(); | |
return hiveMetaStoreClient; | |
} catch (ExecutionException e) { | |
throw new IOException("Error creating hiveMetaStoreClient",e); | |
} | |
} | |
/** | |
* A class to wrap HiveConf and expose equality based only on UserGroupInformation and the metaStoreURIs. | |
* This becomes the key for the cache and this way the same HiveMetaStoreClient would be returned if | |
* UserGroupInformation and metaStoreURIs are same. This function can evolve to express | |
* the cases when HiveConf is different but the same hiveMetaStoreClient can be used | |
*/ | |
public static class HiveClientCacheKey { | |
final private String metaStoreURIs; | |
final private UserGroupInformation ugi; | |
final private HiveConf hiveConf; | |
private HiveClientCacheKey(HiveConf hiveConf) throws IOException, LoginException { | |
this.metaStoreURIs = hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS); | |
ugi = ShimLoader.getHadoopShims().getUGIForConf(hiveConf); | |
this.hiveConf = hiveConf; | |
} | |
public static HiveClientCacheKey fromHiveConf(HiveConf hiveConf) throws IOException, LoginException { | |
return new HiveClientCacheKey(hiveConf); | |
} | |
public HiveConf getHiveConf() { | |
return hiveConf; | |
} | |
@Override | |
public boolean equals(Object o) { | |
if (this == o) return true; | |
if (o == null || getClass() != o.getClass()) return false; | |
HiveClientCacheKey that = (HiveClientCacheKey) o; | |
return new EqualsBuilder(). | |
append(this.metaStoreURIs, | |
that.metaStoreURIs). | |
append(this.ugi, that.ugi).isEquals(); | |
} | |
@Override | |
public int hashCode() { | |
return new HashCodeBuilder(). | |
append(metaStoreURIs). | |
append(ugi).toHashCode(); | |
} | |
} | |
/** | |
* Add # of current users on HiveMetaStoreClient, so that the client can be cleaned when no one is using it. | |
*/ | |
public static class CacheableHiveMetaStoreClient extends HiveMetaStoreClient { | |
private final AtomicInteger users = new AtomicInteger(0); | |
private final AtomicBoolean expiredFromCache = new AtomicBoolean(false); | |
private final AtomicBoolean isClosed = new AtomicBoolean(false); | |
private final long expiryTime; | |
public CacheableHiveMetaStoreClient(final HiveConf conf, final int timeout) throws MetaException { | |
super(conf); | |
this.expiryTime = System.currentTimeMillis() + timeout*1000; | |
} | |
private void acquire() { | |
users.incrementAndGet(); | |
} | |
private void release() { | |
users.decrementAndGet(); | |
} | |
public void setExpiredFromCache() { | |
expiredFromCache.set(true); | |
} | |
public boolean isClosed() { | |
return isClosed.get(); | |
} | |
/** | |
* Decrement the user count and check for expiry time then teardown() if required. This *MUST* be called by | |
* anyone who uses this client. | |
*/ | |
@Override | |
public synchronized void close(){ | |
release(); | |
if(System.currentTimeMillis() >= expiryTime) | |
setExpiredFromCache(); | |
tearDownIfUnused(); | |
} | |
/** | |
* Tear down only if there are no user & it has expired from the cache | |
*/ | |
private synchronized void tearDownIfUnused() { | |
if(users.get() == 0 && expiredFromCache.get()) { | |
this.tearDown(); | |
} | |
} | |
/** | |
* Close if not closed already | |
*/ | |
protected synchronized void tearDown() { | |
try { | |
if(!isClosed.get()) { | |
super.close(); | |
} | |
isClosed.set(true); | |
} catch(Throwable e) { | |
LOG.info("Error closing hive metastore client. Ignored.", e); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment