Skip to content

Instantly share code, notes, and snippets.

@mikelikespie
Created April 19, 2011 08:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mikelikespie/927008 to your computer and use it in GitHub Desktop.
Save mikelikespie/927008 to your computer and use it in GitHub Desktop.
Lazy Pipeline Wrapper for redis-py
import redis
from cStringIO import StringIO
import weakref
import contextlib
from collections import deque
def main():
c = PipesClient('localhost')
fooo = c.get('hi')
# pipelines will flush all the commands to write at the end of each context
# variables are lazily fetched and will block on fetchign
with c.pipeline():
a_result = c.set('foo', 'bar')
print fooo.ref
print a_result.ref
# transactions will flush the commands at the end of each block
# it wraps the first level transaction in a multi and exec
# variables cannot be read while in any nesting of a transaction
# doing so will cause an exception to be raised
with c.transaction():
c.set('bar', 'baz')
c.set('baz', 'baz')
baz = c.get('baz')
print baz.ref
class VariableNotLoadedError(Exception): pass
class _Unset(): pass
class DeferredResult(object):
def __init__(self, command_name, options):
self.command_name = command_name
self.options = options
self._target = _Unset
self.dequeing_multi = False
@property
def ref(self):
return self._target
class TransactionalDeferredResult(DeferredResult):
@property
def ref(self):
if self._target is _Unset:
raise VariableNotLoadedError("Cannot read while within a transaction")
return self._target
class BlockingDeferredResult(DeferredResult):
def __init__(self, command_name, options, pipes_client):
DeferredResult.__init__(self, command_name, options)
self.pipes_client = pipes_client
@property
def ref(self):
while self._target is _Unset:
self.pipes_client._pump_result()
return self._target
class PipesClient(redis.Redis):
def __init__(self, *args, **kwargs):
self.command_buffer = deque()
self.multi_queue = deque()
self.transaction_depth = 0
self.write_buffer = StringIO()
self.dequeing_multi = False
redis.Redis.__init__(self, *args, **kwargs)
@contextlib.contextmanager
def pipeline(self):
try:
yield
finally:
self.flush_commands()
@contextlib.contextmanager
def transaction(self, noread=False):
try:
self.begin(noread)
yield
finally:
self.commit(noread)
def _execute_command(self, command_name, command, **options):
self.write_buffer.write(command)
if self.transaction_depth:
deferred_result = TransactionalDeferredResult(command_name, options)
else:
deferred_result = BlockingDeferredResult(command_name, options, self)
self.command_buffer.append(deferred_result)
return deferred_result
def begin(self, noread=False):
if self.transaction_depth == 0:
self._execute_command('MULTI', 'MULTI\r\n')
elif not noread:
raise Exception("Nested transactions must have noread set to True")
self.transaction_depth += 1
def commit(self, noread=False):
self.transaction_depth -= 1
if self.transaction_depth == 0:
self._execute_command('EXEC', 'EXEC\r\n')
self.flush_commands()
if not noread:
while len(self.command_buffer) > 0:
self._pump_result()
def flush_commands(self):
if self.transaction_depth > 0:
raise Exception("Cannot flush_commands while within a transaction")
if self.write_buffer.tell() > 0:
self.connection.send(self.write_buffer.getvalue(), self)
self.write_buffer.reset()
self.write_buffer.truncate()
def _pump_result(self):
self.flush_commands()
if len(self.command_buffer) == 0:
raise Exception("Consistency error")
deferred_result = self.command_buffer.popleft()
command_name = deferred_result.command_name
options = deferred_result.options
if command_name == 'MULTI':
self.dequeing_multi = True
return
if command_name == 'EXEC':
# we're going to get back an array
results = self.parse_response(command_name, **options)
for r in results:
self.multi_queue.popleft()._target = r
assert(len(self.multi_queue) == 0)
self.dequeing_multi = False
# this case, just parse the response and requeue it up
elif self.dequeing_multi:
self.multi_queue.append(deferred_result)
self.parse_response(command_name, **options)
else:
options = deferred_result.options
result = self.parse_response(command_name, **options)
#if dequeing_multi
deferred_result._target = result
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment