Skip to content

Instantly share code, notes, and snippets.

@dstuebe
Created March 12, 2013 13:52
Show Gist options
  • Save dstuebe/5143032 to your computer and use it in GitHub Desktop.
Save dstuebe/5143032 to your computer and use it in GitHub Desktop.
Use a python with-statement and redis to coordinate distributed worker processes.
#!/usr/bin/env python
'''
@author David Stuebe <dstuebe@asascience.com>
@author Luke Campbell
@author Swarbhanu Chatterjee
'''
import gevent
import redis
from uuid import uuid4
class RedisCoordination(list):
"""
This module provides the ability to coordinate distributed state between an arbitrary number of workers about a fixed
number of packets received between them. It provides for fault tolerant recovery from errors during processing of that
data.
"""
def __init__(self, rserver=None, name='', block_size=50, timeout=10, packet=None):
self._item = packet # A packet - the set of which we are trying to coordinate
self._rserver = rserver # The redis connection
self._block_size = block_size # the block size of the list to coordinate on
self._base_name = name # The list name to use in redis
self._timeout = timeout # the timeout at which point exit is called
# Setup the internals of this method
self._list_name = '%s.list' % self._base_name
self._backups_set_name = '%s.set' % self._base_name
self._my_backup_list_name = None
def __enter__(self ):
self._safe_lpush_item_rpop_range()
self.timeout = gevent.Timeout(self._timeout)
self.timeout.start()
# Start the time out here...
return self
def __exit__(self, type, value, traceback):
# cleanup and deal with any exception thrown during processing
self.timeout.cancel()
if self._my_backup_list_name is not None:
if value is None:
### No exception raised - remove the backup list of values we just processed
#print '(ID:%s) EXIT: Cleaning up my backup list from set: %s' % (self._id, self._my_backup_list_name)
self._rserver.delete(self._my_backup_list_name)
else:
### Exception raised - tell others about the backup list and abort!
#print '(ID:%s) EXIT: Raise exception %s; Saving backup list name: %s' % (self._id, value, self._my_backup_list_name)
self._rserver.sadd(self._backups_set_name, self._my_backup_list_name)
def _safe_lpush_item_rpop_range(self):
### Coordination to get the range when the list is full goes here
#########
# An error or network interruption occurring within the following block of code can cause problems!
# =====================================
current_length = self._rserver.lpush(self._list_name, self._item)
### If we added the item that fills a block size, take those values and process them
if current_length % self._block_size == 0:
queue_name = str(uuid4())[:13]
self._my_backup_list_name = '%s.%s.list' % (self._base_name, queue_name)
with self._rserver.pipeline() as pipe:
pipe.multi()
for i in xrange(self._block_size):
pipe.rpoplpush(self._list_name, self._my_backup_list_name)
self.extend(pipe.execute())
# =====================================
else:
### The main list is not full, check for backup lists that need to be handled due to previous failure.
self._my_backup_list_name = self._rserver.spop(self._backups_set_name)
if self._my_backup_list_name:
self.extend(self._rserver.lrange(self._my_backup_list_name, 0, -1))
#print '(ID:%s) Got backup list name: %s; contents: %s' % (self._id, self._my_backup_list_name, self)
#!/usr/bin/env python
"""
@author David Stuebe <dstuebe@asascience.com>
@author Luke Campbell
@author Swarbhanu Chatterjee
Runs a single instance of a redis coordination test using a fixed name and known coordination set. Run in a distributed
test using subprocess_redis.py
To Run:
bin/python subprocess_redis.py
"""
from gevent.monkey import patch_all
patch_all()
import gevent
import redis
import uuid
from redis_coordination import RedisCoordination
import random
COMPARE_SET = 'compareset' # Key name for the set which holds the truth about all data in the test
def proc(points, block_size, timeout, flag):
# Create a redis connection
conn = redis.StrictRedis('localhost', db=0)
for i in xrange(points):
# Create some data for the test
datum = str(uuid.uuid4())
# Add it to redis as the authoritative set of data for the test
conn.sadd(COMPARE_SET, datum)
# Use the Redis Coordination as a WITH statement inside a try except block so that we can intentionally raise
# exceptions as part of the test harness
try:
# Create the with statement passing in the redis connection and other paramters
# name should be a known string shared between instances such as a stream_id or dataset_id
with RedisCoordination(rserver=conn, block_size=block_size, name='foo', timeout=timeout, packet=datum) as coordinator:
# The RedisCoordination class inherits from list. If a block size list of values is available for processing
# they will be returned in the coordinator class which you can treat as a list.
###
# Do processing of the data here!
###
# Generate a random value to trip an exception for test purposes
salt = random.normalvariate(mu=0,sigma=1)
if salt < -2.0:
raise RuntimeError('Error raised at random as part of the test harness')
#elif salt > 2.0:
# time.sleep(1.0)
# a timeout here is just another kind of exception... but it makes the test really slow
for item in coordinator:
# As part of the test harness remove each value in the coordinator from the set of Truth in redis
# if the test succeeds redis will be empty when the test is complete.
result = conn.srem(COMPARE_SET, item)
if not result:
raise KeyError('ERROR IN COMPARISON!!!! Tried to remove data that was not there!')
except RuntimeError:
# Keep running the test......
pass
if flag:
# Run a few more block sizes without raising errors to make sure that any previous test failures can be cleaned up.
for i in xrange(block_size*2):
datum = str(uuid.uuid4())
conn.sadd(COMPARE_SET, datum)
with RedisCoordination(rserver=conn, block_size=block_size, name='foo', timeout=timeout, packet=datum) as coordinator:
for item in coordinator:
result = conn.srem(COMPARE_SET, item)
if not result:
raise KeyError('ERROR IN COMPARISON!!!! Tried to remove data that was not there!')
if __name__ == '__main__':
import sys
block_size = 15
data_size = block_size * 800
flag = None
if len(sys.argv) > 1:
flag = sys.argv[1]
print 'Got Flag: "%s"' % flag
g = gevent.Greenlet(proc, points=data_size, block_size=block_size, timeout=0.4, flag=flag)
g.start()
gevent.joinall([g,])
#!/usr/bin/env python
"""
@author David Stuebe <dstuebe@asascience.com>
@author Luke Campbell
@author Swarbhanu Chatterjee
Runs pool_size # of test cases of the redis_coordinator_test at one time demonstrating distributed state between them.
To Run:
bin/python subprocess_redis.py
"""
import subprocess
if __name__ == '__main__':
subprocess_pool = list()
open_files = list()
pool_size = 20
for i in xrange(pool_size):
print 'starting subprocess %d' % i
job = 'bin/python'
arg = 'redis_coordinator_test.py'
proc = [job, arg]
if i==1:
proc.append('1')
f = open('/tmp/g%s'%i, 'w')
out = f.fileno()
open_files.append(f)
subprocess_pool.append(subprocess.Popen(proc, shell=False, stdout=out, stderr=out))
for process in subprocess_pool:
process.wait()
for f in open_files:
f.close()
print "TADA!"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment