Skip to content

Instantly share code, notes, and snippets.

@ewhauser
Created August 22, 2011 16:05
Show Gist options
  • Save ewhauser/1162766 to your computer and use it in GitHub Desktop.
Save ewhauser/1162766 to your computer and use it in GitHub Desktop.
//Maven repo at - maven.twttr.com
libraries.twitter = [
'com.twitter.common:application:0.0.5',
'com.twitter.common:application-module-stats:0.0.1',
'com.twitter.common:application-module-http:0.0.1',
'com.twitter.common:application-module-log:0.0.1',
'com.twitter.common:inject:0.0.10',
'com.twitter.common:net-pool:0.0.12',
]
package com.exacttarget.bloomin.jedis;
import com.google.common.base.Preconditions;
import com.twitter.common.net.pool.Connection;
import redis.clients.jedis.Jedis;
public class JedisConnection implements Connection<Jedis, Jedis> {
private final Jedis jedis;
public JedisConnection(Jedis jedis) {
this.jedis = Preconditions.checkNotNull(jedis);
}
@Override
public Jedis get() {
return jedis;
}
@Override
public boolean isValid() {
return true;
}
@Override
public void close() {
try {
try {
jedis.quit();
} catch (Exception ignored) {
//ignored
}
jedis.disconnect();
} catch (Exception ignored) {
//ignored
}
}
@Override
public Jedis getEndpoint() {
return jedis;
}
}
package com.exacttarget.bloomin.jedis;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.twitter.common.net.pool.ConnectionFactory;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import redis.clients.jedis.Jedis;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class JedisConnectionFactory implements ConnectionFactory<JedisConnection> {
private final int maxConnections;
private final Set<JedisConnection> activeConnections =
Sets.newSetFromMap(
Maps.<JedisConnection, Boolean>newIdentityHashMap());
private volatile int lastActiveConnectionsSize = 0;
private final Lock activeConnectionsWriteLock = new ReentrantLock(true);
private String poolName;
private ShardInfo shard;
public JedisConnectionFactory(ShardInfo shard, int maxConnections) {
this.shard = Preconditions.checkNotNull(shard);
Preconditions.checkState(maxConnections > 0);
this.maxConnections = maxConnections;
this.poolName = shard.getMaster().toString();
}
@Override
public boolean mightCreate() {
return lastActiveConnectionsSize < maxConnections;
}
@Override
public JedisConnection create(Amount<Long, Time> timeout) throws IOException {
Preconditions.checkNotNull(timeout);
if (timeout.getValue() == 0) {
return create();
}
try {
long timeRemainingNs = timeout.as(Time.NANOSECONDS);
long start = System.nanoTime();
if (activeConnectionsWriteLock.tryLock(timeRemainingNs, TimeUnit.NANOSECONDS)) {
try {
if (!willCreateSafe()) {
return null;
}
timeRemainingNs -= System.nanoTime() - start;
return createConnection();
} finally {
activeConnectionsWriteLock.unlock();
}
} else {
return null;
}
} catch (InterruptedException e) {
return null;
}
}
private JedisConnection create() throws IOException {
activeConnectionsWriteLock.lock();
try {
if (!willCreateSafe()) {
return null;
}
return createConnection();
} finally {
activeConnectionsWriteLock.unlock();
}
}
private JedisConnection createConnection() throws IOException {
InetSocketAddress master = shard.getMaster();
Jedis jedis = new Jedis(master.getHostName(), master.getPort());
JedisConnection connection = new JedisConnection(jedis);
activeConnections.add(connection);
lastActiveConnectionsSize = activeConnections.size();
return connection;
}
private boolean willCreateSafe() {
return activeConnections.size() < maxConnections;
}
@Override
public void destroy(JedisConnection connection) {
activeConnectionsWriteLock.lock();
try {
boolean wasActiveConnection = activeConnections.remove(connection);
Preconditions.checkArgument(wasActiveConnection,
"connection %s not created by this factory", connection);
lastActiveConnectionsSize = activeConnections.size();
} finally {
activeConnectionsWriteLock.unlock();
}
// We close the connection outside the critical section which means we may have more connections
// "active" (open) than maxConnections for a very short time
connection.close();
}
@Override
public String toString() {
return String.format("%s[%s]", getClass().getSimpleName(), poolName);
}
}
package com.exacttarget.bloomin.jedis;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import com.twitter.common.base.ExceptionalClosure;
import com.twitter.common.base.ExceptionalFunction;
import com.twitter.common.base.ExceptionalSupplier;
import com.twitter.common.net.pool.ConnectionPool;
import com.twitter.common.quantity.Amount;
import com.twitter.common.quantity.Time;
import com.twitter.common.util.BackoffHelper;
import redis.clients.jedis.Jedis;
import javax.inject.Named;
import java.io.IOException;
public class JedisHelper {
private final ConnectionPool<JedisConnection> connectionPool;
private final BackoffHelper backoff;
/**
* @param pool Connection pool for Jedis
* @param jedisBackOffms The amount of time in millisconds to wait for Jedis connections to be killed before
* they are retried
* @param jedisBackOffMax The amount of time in milliseconds that Jedis connections will be retried
*/
@Inject
public JedisHelper(ConnectionPool<JedisConnection> pool,
@Named("jedis.backoff.ms") long jedisBackOffms,
@Named("jedis.backoff.max") long jedisBackOffMax) {
this.connectionPool = Preconditions.checkNotNull(pool);
this.backoff = new BackoffHelper(Amount.of(jedisBackOffms, Time.MILLISECONDS),
Amount.of(jedisBackOffMax, Time.MILLISECONDS));
}
public void doInPool(final ExceptionalClosure<Jedis, IOException> closure) {
doInPool(new ExceptionalFunction<Jedis, Object, IOException>() {
@Override
public Object apply(Jedis table) throws IOException {
closure.execute(table);
return Void.TYPE;
}
});
}
public <T> T doInPool(final ExceptionalFunction<Jedis, T, IOException> function) {
try {
return backoff.doUntilResult(new ExceptionalSupplier<T, Exception>() {
@Override
public T get() throws Exception {
JedisConnection jedisConnection = connectionPool.get();
try {
Jedis jedis = jedisConnection.get();
return function.apply(jedis);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
connectionPool.release(jedisConnection);
}
}
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void close() {
connectionPool.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment