Skip to content

Instantly share code, notes, and snippets.

@Grokzen
Created November 29, 2015 02:22
Show Gist options
  • Save Grokzen/8109ef4c0b4fb149e8ee to your computer and use it in GitHub Desktop.
Save Grokzen/8109ef4c0b4fb149e8ee to your computer and use it in GitHub Desktop.
@@ -397,7 +400,8 @@ class StrictRedis(object):
charset=None, errors=None,
decode_responses=False, retry_on_timeout=False,
ssl=False, ssl_keyfile=None, ssl_certfile=None,
- ssl_cert_reqs=None, ssl_ca_certs=None):
+ ssl_cert_reqs=None, ssl_ca_certs=None,
+ max_connections=None):
if not connection_pool:
if charset is not None:
warnings.warn(DeprecationWarning(
@@ -415,7 +419,8 @@ class StrictRedis(object):
'encoding': encoding,
'encoding_errors': encoding_errors,
'decode_responses': decode_responses,
- 'retry_on_timeout': retry_on_timeout
+ 'retry_on_timeout': retry_on_timeout,
+ 'max_connections': max_connections
}
# based on input, setup appropriate connection args
if unix_socket_path is not None:
@@ -476,6 +481,7 @@ class StrictRedis(object):
"""
shard_hint = kwargs.pop('shard_hint', None)
value_from_callable = kwargs.pop('value_from_callable', False)
+ watch_delay = kwargs.pop('watch_delay', None)
with self.pipeline(True, shard_hint) as pipe:
while 1:
try:
@@ -485,6 +491,8 @@ class StrictRedis(object):
exec_value = pipe.execute()
return func_value if value_from_callable else exec_value
except WatchError:
+ if watch_delay is not None and watch_delay > 0:
+ time.sleep(watch_delay)
continue
def lock(self, name, timeout=None, sleep=0.1, blocking_timeout=None,
@@ -1799,12 +1832,12 @@ class StrictRedis(object):
"Adds the specified elements to the specified HyperLogLog."
return self.execute_command('PFADD', name, *values)
- def pfcount(self, name):
+ def pfcount(self, *sources):
"""
Return the approximated cardinality of
- the set observed by the HyperLogLog at key.
+ the set observed by the HyperLogLog at key(s).
"""
- return self.execute_command('PFCOUNT', name)
+ return self.execute_command('PFCOUNT', *sources)
def pfmerge(self, dest, *sources):
"Merge N different HyperLogLogs into a single one."
@@ -2142,10 +2175,10 @@ class PubSub(object):
# previously listening to
return command(*args)
- def parse_response(self, block=True):
+ def parse_response(self, block=True, timeout=0):
"Parse the response from a publish/subscribe command"
connection = self.connection
- if not block and not connection.can_read():
+ if not block and not connection.can_read(timeout=timeout):
return None
return self._execute(connection, connection.read_response)
--- a/redis/connection.py
+++ b/redis/connection.py
@@ -79,7 +80,9 @@ class Token(object):
class BaseParser(object):
EXCEPTION_CLASSES = {
- 'ERR': ResponseError,
+ 'ERR': {
+ 'max number of clients reached': ConnectionError
+ },
'EXECABORT': ExecAbortError,
'LOADING': BusyLoadingError,
'NOSCRIPT': NoScriptError,
@@ -91,7 +94,10 @@ class BaseParser(object):
error_code = response.split(' ')[0]
if error_code in self.EXCEPTION_CLASSES:
response = response[len(error_code) + 1:]
- return self.EXCEPTION_CLASSES[error_code](response)
+ exception_class = self.EXCEPTION_CLASSES[error_code]
+ if isinstance(exception_class, dict):
+ exception_class = exception_class.get(response, ResponseError)
+ return exception_class(response)
return ResponseError(response)
@@ -542,11 +548,12 @@ class Connection(object):
e = sys.exc_info()[1]
self.disconnect()
if len(e.args) == 1:
- _errno, errmsg = 'UNKNOWN', e.args[0]
+ errno, errmsg = 'UNKNOWN', e.args[0]
else:
- _errno, errmsg = e.args
+ errno = e.args[0]
+ errmsg = e.args[1]
raise ConnectionError("Error %s while writing to socket. %s." %
- (_errno, errmsg))
+ (errno, errmsg))
except:
self.disconnect()
raise
@@ -555,13 +562,14 @@ class Connection(object):
"Pack and send a command to the Redis server"
self.send_packed_command(self.pack_command(*args))
- def can_read(self):
+ def can_read(self, timeout=0):
"Poll the socket to see if there's data that can be read."
sock = self._sock
if not sock:
self.connect()
sock = self._sock
- return bool(select([sock], [], [], 0)[0]) or self._parser.can_read()
+ return self._parser.can_read() or \
+ bool(select([sock], [], [], timeout)[0])
def read_response(self):
"Read the response from a previously sent command"
diff --git a/redis/sentinel.py b/redis/sentinel.py
index 2f30062..3fb89ce 100644
--- a/redis/sentinel.py
+++ b/redis/sentinel.py
@@ -129,6 +129,8 @@ class SentinelConnectionPool(ConnectionPool):
self.disconnect()
self.reset()
self.__init__(self.service_name, self.sentinel_manager,
+ is_master=self.is_master,
+ check_connection=self.check_connection,
connection_class=self.connection_class,
max_connections=self.max_connections,
**self.connection_kwargs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment