Skip to content

Instantly share code, notes, and snippets.

@PaulLiang1
Last active October 12, 2015 12:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save PaulLiang1/b145c7219773916bc701 to your computer and use it in GitHub Desktop.
Save PaulLiang1/b145c7219773916bc701 to your computer and use it in GitHub Desktop.
class FooBarCache(object):
DEFAULT_EXP_SEC = 86400
def __init__(self,
host,
port,
max_retry=5,
retry_sleep=0.1,
*args,
**kwargs):
self.connection_setup_host = host
self.connection_setup_port = port
self.connection_setup_kwargs = kwargs
self.lookup_ids = lookup_ids
self.conn = StrictRedis(host=self.connection_setup_host,
port=self.connection_setup_port,
**kwargs)
self.max_retry = max_retry
self.retry_sleep = retry_sleep
def handle_redis_connection_failure(function):
def __handle_redis_connection_failure(self, *args, **kwargs):
retry_count = 0
sleep_time = self.retry_sleep
while retry_count < self.max_retry:
try:
return function(self, *args, **kwargs)
except (ConnectionError, TimeoutError, BusyLoadingError) as ex:
logger.critical(u'Hitting Redis Error: {0}'.format(ex))
logger.critical('Retrying for {0}/{1} time'.format(
retry_count,
self.max_retry
))
retry_count += 1
try:
resp = self.conn.ping()
logger.info('Trying to ping Redis Server [{0}]. Resp: {1}'.format(
repr(self),
resp
))
except Exception as ex:
logger.critical('Resetting connection to Redis with host:[{0}] port:[{1}], kwargs:[{2}].'.format(
self.connection_setup_host,
self.connection_setup_port,
self.connection_setup_kwargs
))
self.conn = StrictRedis(host=self.connection_setup_host,
port=self.connection_setup_port,
**(self.connection_setup_kwargs))
_actual_sleep_time = float(sleep_time + random.random())
logger.critical('Sleep for {0} second(s) before retry.'.format(_actual_sleep_time))
time.sleep( _actual_sleep_time )
sleep_time *= 2
msg = 'Redis Server [{0}] hit irrecoverable error.'.format(repr(self))
raise RuntimeError(msg)
return __handle_redis_connection_failure
@handle_redis_connection_failure
def set(self, *args, **kwargs):
return self.conn.set(*args, **kwargs)
@handle_redis_connection_failure
def get(self, *args, **kwargs):
return self.conn.get(*args, **kwargs)
@handle_redis_connection_failure
def setex(self, name, time, value):
with CloudWatchTimer(name='akacache.setex'):
return self.conn.setex(name, time, value)
@handle_redis_connection_failure
def flushdb(self):
return self.conn.flushdb()
@handle_redis_connection_failure
def foo(bar):
pass
import random
class MultiFooBarCache(object):
def __init__(self,
cache_config,
*args,
**kwargs):
self.writer_node_conf = cache_config['writer_node']
self.reader_node_conf = cache_config['reader_node']
self.writer_node = FooBarCache(
host=self.writer_node_conf['endpoint'][0],
port=self.writer_node_conf['endpoint'][1]
)
logger.info('Initalized FooBarCache Writer Node-> {0}:{1}'.format(
self.writer_node_conf['endpoint'][0],
self.writer_node_conf['endpoint'][1]
))
self.reader_nodes = list()
for reader_endpoint in self.reader_node_conf['endpoint']:
reader = AKACache(
lookup_ids=self.lookup_ids,
host=reader_endpoint[0],
port=reader_endpoint[1]
)
self.reader_nodes.append(reader)
logger.info('Initalized FooBarCache Reader Node-> {0}:{1}'.format(
reader_endpoint[0],
reader_endpoint[1]
))
@property
def _get_random_reader(self):
return random.choice(self.reader_nodes)
# Write Operations
# Only Apply to Primary Node
# In case of Primary Node failure will be handled
# by ElasticCache fail over
# http://docs.aws.amazon.com/AmazonElastiCache/latest/UserGuide/AutoFailover.html
def set(self, *args, **kwargs):
return self.writer_node.set(*args, **kwargs)
def setex(self, name, time, value):
return self.writer_node.setex(name, time, value)
def flushdb(self, *args, **kwargs):
return self.writer_node.flushdb()
def update(self, uuid, overwrite=False, expire_sec=None, **kwargs):
return self.writer_node.update(uuid=uuid,
overwrite=overwrite,
expire_sec=expire_sec,
**kwargs)
# Read Operations
# Can be done in any of the Slave Nodes (including Master)
# So randomly pick one node and do it
def get(self, *args, **kwargs):
return self._get_random_reader.get(*args, **kwargs)
def lookup(self, **kwargs):
return self._get_random_reader.lookup(**kwargs)
def show(self, **kwargs):
return self._get_random_reader.show(**kwargs)
cache_config = {
'cache': {
# Only Write to Master Node
# Endpoint is (url,port) tuple
'writer_node':{
'endpoint': ('blah.blah.cache.amazonaws.com', 1234)
},
# Can read from multiple Slave Node/Master Node
# Endpoint is list of (url,port) tuple
'reader_node':{
'endpoint': [
('blah.0001.somewhere.cache.amazonaws.com', 1234),
('blah.0002.somewhere.cache.amazonaws.com', 1234),
],
}
},
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment