Skip to content

Instantly share code, notes, and snippets.

@amalakar
Created July 26, 2012 21:19
Show Gist options
  • Save amalakar/3184582 to your computer and use it in GitHub Desktop.
Save amalakar/3184582 to your computer and use it in GitHub Desktop.
thread safe cache client
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hcatalog.common.cache;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hcatalog.common.HCatConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.security.auth.login.LoginException;
import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* A time expired cache for HiveMetaStoreClient
*/
public class HiveClientCache {
final private Cache<HiveClientCacheKey, CacheableHiveMetaStoreClient> hiveCache;
private static final Logger LOG = LoggerFactory.getLogger(HiveClientCache.class);
private final int timetout;
// Lazy initialization for singleton
static class LazyHolder {
static final HiveConf hiveConf = new HiveConf(HiveClientCache.class);
static final HiveClientCache INSTANCE =
new HiveClientCache(hiveConf.getInt(HCatConf.HCATALOG_HIVE_CLIENT_EXPIRY_TIME, 2*60));
}
public static HiveClientCache getInstance() {
return LazyHolder.INSTANCE;
}
/**
* The class is protected for the purpose of testing, could have been private otherwise
* @param timeout the length of time in seconds after a client is created that it should be automatically removed
*/
protected HiveClientCache(final int timeout) {
this.timetout = timeout;
RemovalListener<HiveClientCacheKey, CacheableHiveMetaStoreClient> removalListener =
new RemovalListener<HiveClientCacheKey, CacheableHiveMetaStoreClient>() {
public void onRemoval(RemovalNotification<HiveClientCacheKey, CacheableHiveMetaStoreClient> notification) {
CacheableHiveMetaStoreClient hiveMetaStoreClient = notification.getValue();
if (hiveMetaStoreClient != null) {
hiveMetaStoreClient.setExpiredFromCache();
hiveMetaStoreClient.tearDownIfUnused();
}
}
};
hiveCache = CacheBuilder.newBuilder()
.expireAfterWrite(timeout, TimeUnit.SECONDS)
.removalListener(removalListener)
.build();
// Add a shutdown hook for cleanup, if there are elements remaining in the cache which were not cleaned up.
// This is the best effort approach. Ignore any error while doing so. Notice that most of the clients
// would get cleaned up via either the removalListener or the close() call, only the clients created in the
// which expired after the last cache call wont
Thread cleanupHiveClientShutdownThread = new Thread() {
@Override
public void run() {
LOG.info("Cleaning up hive client cache in ShutDown hook");
try {
ConcurrentMap<HiveClientCacheKey, CacheableHiveMetaStoreClient> elements = hiveCache.asMap();
for (HiveClientCacheKey hiveClientCacheKey : elements.keySet()) {
elements.get(hiveClientCacheKey).tearDown();
}
} catch (Throwable e) {
LOG.info("Clean up of hive client cache during shutdown failed. Ignored", e);
}
}
};
Runtime.getRuntime().addShutdownHook(cleanupHiveClientShutdownThread);
}
public void cleanup() {
hiveCache.cleanUp();
}
/**
* Returns a cached client if exists or else creates one, caches and returns it
* @param hiveConf
* @return the hive client
* @throws MetaException
* @throws IOException
* @throws LoginException
*/
public HiveMetaStoreClient get(final HiveConf hiveConf) throws MetaException, IOException, LoginException {
try {
final HiveClientCacheKey cacheKey = HiveClientCacheKey.fromHiveConf(hiveConf);
CacheableHiveMetaStoreClient hiveMetaStoreClient = hiveCache.get(cacheKey, new Callable<CacheableHiveMetaStoreClient>() {
@Override
public CacheableHiveMetaStoreClient call() throws MetaException {
return new CacheableHiveMetaStoreClient(cacheKey.getHiveConf(), timetout);
}
});
hiveMetaStoreClient.acquire();
return hiveMetaStoreClient;
} catch (ExecutionException e) {
throw new IOException("Error creating hiveMetaStoreClient",e);
}
}
/**
* A class to wrap HiveConf and expose equality based only on UserGroupInformation and the metaStoreURIs.
* This becomes the key for the cache and this way the same HiveMetaStoreClient would be returned if
* UserGroupInformation and metaStoreURIs are same. This function can evolve to express
* the cases when HiveConf is different but the same hiveMetaStoreClient can be used
*/
public static class HiveClientCacheKey {
final private String metaStoreURIs;
final private UserGroupInformation ugi;
final private HiveConf hiveConf;
private HiveClientCacheKey(HiveConf hiveConf) throws IOException, LoginException {
this.metaStoreURIs = hiveConf.getVar(HiveConf.ConfVars.METASTOREURIS);
ugi = ShimLoader.getHadoopShims().getUGIForConf(hiveConf);
this.hiveConf = hiveConf;
}
public static HiveClientCacheKey fromHiveConf(HiveConf hiveConf) throws IOException, LoginException {
return new HiveClientCacheKey(hiveConf);
}
public HiveConf getHiveConf() {
return hiveConf;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
HiveClientCacheKey that = (HiveClientCacheKey) o;
return new EqualsBuilder().
append(this.metaStoreURIs,
that.metaStoreURIs).
append(this.ugi, that.ugi).isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder().
append(metaStoreURIs).
append(ugi).toHashCode();
}
}
/**
* Add # of current users on HiveMetaStoreClient, so that the client can be cleaned when no one is using it.
*/
public static class CacheableHiveMetaStoreClient extends HiveMetaStoreClient {
private final AtomicInteger users = new AtomicInteger(0);
private final AtomicBoolean expiredFromCache = new AtomicBoolean(false);
private final AtomicBoolean isClosed = new AtomicBoolean(false);
private final long expiryTime;
public CacheableHiveMetaStoreClient(final HiveConf conf, final int timeout) throws MetaException {
super(conf);
this.expiryTime = System.currentTimeMillis() + timeout*1000;
}
private void acquire() {
users.incrementAndGet();
}
private void release() {
users.decrementAndGet();
}
public void setExpiredFromCache() {
expiredFromCache.set(true);
}
public boolean isClosed() {
return isClosed.get();
}
/**
* Decrement the user count and check for expiry time then teardown() if required. This *MUST* be called by
* anyone who uses this client.
*/
@Override
public synchronized void close(){
release();
if(System.currentTimeMillis() >= expiryTime)
setExpiredFromCache();
tearDownIfUnused();
}
/**
* Tear down only if there are no user & it has expired from the cache
*/
private synchronized void tearDownIfUnused() {
if(users.get() == 0 && expiredFromCache.get()) {
this.tearDown();
}
}
/**
* Close if not closed already
*/
protected synchronized void tearDown() {
try {
if(!isClosed.get()) {
super.close();
}
isClosed.set(true);
} catch(Throwable e) {
LOG.info("Error closing hive metastore client. Ignored.", e);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment