Skip to content

Instantly share code, notes, and snippets.

@penafieljlm
Created July 26, 2019 16:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save penafieljlm/84222814ac0e50e112d579a85f756813 to your computer and use it in GitHub Desktop.
Save penafieljlm/84222814ac0e50e112d579a85f756813 to your computer and use it in GitHub Desktop.
Redis-py Streaming Transaction
"""This module contains the Transaction object."""
from redis.client import Redis
class TransactionException(Exception):
"""Thrown when a transaction error is encountered."""
pass
class Transaction(Redis):
"""
Wraps commands in a MULTI-EXEC trasnaction.
Unlike the redis-py pipeline, this implementation doesn't queue the
commands in the client, hence, avoiding an OOM error when performing
large transactions.
"""
def __init__(self, pool):
"""Initialize this Transaction object."""
self.pool = pool
self.conn = None
self.fate = {True: self.__discard__,
False: self.__execute__}
def __enter__(self):
"""Enter the transaction block."""
self.conn = self.pool.get_connection('MULTI', None)
self.conn.send_command('MULTI')
response = self.conn.read_response()
if response != b'OK':
raise TransactionException(response)
return self
def __exit__(self, exc_type, exc_value, traceback):
"""Exit the transaction block."""
try:
self.fate[bool(exc_value)]()
finally:
self.pool.release(self.conn)
self.conn = None
def execute_command(self, *args, **options):
"""Execute a command and return a parsed response."""
self.conn.send_command(*args, **options)
response = self.conn.read_response()
if response != b'QUEUED':
raise TransactionException(response)
return True
def __discard__(self):
"""Discard the queued commands."""
self.conn.send_command('DISCARD')
response = self.conn.read_response()
if response != b'OK':
raise TransactionException(response)
def __execute__(self):
"""Execute the queued commands."""
self.conn.send_command('EXEC')
response = self.conn.read_response()
if not (response is None or isinstance(response, list)):
raise TransactionException(response)
def transaction(client):
"""Create a transaction from the provided redis client."""
return Transaction(client.connection_pool)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment