-
-
Save PaulLiang1/b145c7219773916bc701 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class FooBarCache(object): | |
DEFAULT_EXP_SEC = 86400 | |
def __init__(self, | |
host, | |
port, | |
max_retry=5, | |
retry_sleep=0.1, | |
*args, | |
**kwargs): | |
self.connection_setup_host = host | |
self.connection_setup_port = port | |
self.connection_setup_kwargs = kwargs | |
self.lookup_ids = lookup_ids | |
self.conn = StrictRedis(host=self.connection_setup_host, | |
port=self.connection_setup_port, | |
**kwargs) | |
self.max_retry = max_retry | |
self.retry_sleep = retry_sleep | |
def handle_redis_connection_failure(function): | |
def __handle_redis_connection_failure(self, *args, **kwargs): | |
retry_count = 0 | |
sleep_time = self.retry_sleep | |
while retry_count < self.max_retry: | |
try: | |
return function(self, *args, **kwargs) | |
except (ConnectionError, TimeoutError, BusyLoadingError) as ex: | |
logger.critical(u'Hitting Redis Error: {0}'.format(ex)) | |
logger.critical('Retrying for {0}/{1} time'.format( | |
retry_count, | |
self.max_retry | |
)) | |
retry_count += 1 | |
try: | |
resp = self.conn.ping() | |
logger.info('Trying to ping Redis Server [{0}]. Resp: {1}'.format( | |
repr(self), | |
resp | |
)) | |
except Exception as ex: | |
logger.critical('Resetting connection to Redis with host:[{0}] port:[{1}], kwargs:[{2}].'.format( | |
self.connection_setup_host, | |
self.connection_setup_port, | |
self.connection_setup_kwargs | |
)) | |
self.conn = StrictRedis(host=self.connection_setup_host, | |
port=self.connection_setup_port, | |
**(self.connection_setup_kwargs)) | |
_actual_sleep_time = float(sleep_time + random.random()) | |
logger.critical('Sleep for {0} second(s) before retry.'.format(_actual_sleep_time)) | |
time.sleep( _actual_sleep_time ) | |
sleep_time *= 2 | |
msg = 'Redis Server [{0}] hit irrecoverable error.'.format(repr(self)) | |
raise RuntimeError(msg) | |
return __handle_redis_connection_failure | |
@handle_redis_connection_failure | |
def set(self, *args, **kwargs): | |
return self.conn.set(*args, **kwargs) | |
@handle_redis_connection_failure | |
def get(self, *args, **kwargs): | |
return self.conn.get(*args, **kwargs) | |
@handle_redis_connection_failure | |
def setex(self, name, time, value): | |
with CloudWatchTimer(name='akacache.setex'): | |
return self.conn.setex(name, time, value) | |
@handle_redis_connection_failure | |
def flushdb(self): | |
return self.conn.flushdb() | |
@handle_redis_connection_failure | |
def foo(bar): | |
pass | |
import random | |
class MultiFooBarCache(object): | |
def __init__(self, | |
cache_config, | |
*args, | |
**kwargs): | |
self.writer_node_conf = cache_config['writer_node'] | |
self.reader_node_conf = cache_config['reader_node'] | |
self.writer_node = FooBarCache( | |
host=self.writer_node_conf['endpoint'][0], | |
port=self.writer_node_conf['endpoint'][1] | |
) | |
logger.info('Initalized FooBarCache Writer Node-> {0}:{1}'.format( | |
self.writer_node_conf['endpoint'][0], | |
self.writer_node_conf['endpoint'][1] | |
)) | |
self.reader_nodes = list() | |
for reader_endpoint in self.reader_node_conf['endpoint']: | |
reader = AKACache( | |
lookup_ids=self.lookup_ids, | |
host=reader_endpoint[0], | |
port=reader_endpoint[1] | |
) | |
self.reader_nodes.append(reader) | |
logger.info('Initalized FooBarCache Reader Node-> {0}:{1}'.format( | |
reader_endpoint[0], | |
reader_endpoint[1] | |
)) | |
@property | |
def _get_random_reader(self): | |
return random.choice(self.reader_nodes) | |
# Write Operations | |
# Only Apply to Primary Node | |
# In case of Primary Node failure will be handled | |
# by ElasticCache fail over | |
# http://docs.aws.amazon.com/AmazonElastiCache/latest/UserGuide/AutoFailover.html | |
def set(self, *args, **kwargs): | |
return self.writer_node.set(*args, **kwargs) | |
def setex(self, name, time, value): | |
return self.writer_node.setex(name, time, value) | |
def flushdb(self, *args, **kwargs): | |
return self.writer_node.flushdb() | |
def update(self, uuid, overwrite=False, expire_sec=None, **kwargs): | |
return self.writer_node.update(uuid=uuid, | |
overwrite=overwrite, | |
expire_sec=expire_sec, | |
**kwargs) | |
# Read Operations | |
# Can be done in any of the Slave Nodes (including Master) | |
# So randomly pick one node and do it | |
def get(self, *args, **kwargs): | |
return self._get_random_reader.get(*args, **kwargs) | |
def lookup(self, **kwargs): | |
return self._get_random_reader.lookup(**kwargs) | |
def show(self, **kwargs): | |
return self._get_random_reader.show(**kwargs) | |
cache_config = { | |
'cache': { | |
# Only Write to Master Node | |
# Endpoint is (url,port) tuple | |
'writer_node':{ | |
'endpoint': ('blah.blah.cache.amazonaws.com', 1234) | |
}, | |
# Can read from multiple Slave Node/Master Node | |
# Endpoint is list of (url,port) tuple | |
'reader_node':{ | |
'endpoint': [ | |
('blah.0001.somewhere.cache.amazonaws.com', 1234), | |
('blah.0002.somewhere.cache.amazonaws.com', 1234), | |
], | |
} | |
}, | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment