Created
March 12, 2013 13:52
-
-
Save dstuebe/5143032 to your computer and use it in GitHub Desktop.
Use a python with-statement and redis to coordinate distributed worker processes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
@author David Stuebe <dstuebe@asascience.com> | |
@author Luke Campbell | |
@author Swarbhanu Chatterjee | |
''' | |
import gevent | |
import redis | |
from uuid import uuid4 | |
class RedisCoordination(list): | |
""" | |
This module provides the ability to coordinate distributed state between an arbitrary number of workers about a fixed | |
number of packets received between them. It provides for fault tolerant recovery from errors during processing of that | |
data. | |
""" | |
def __init__(self, rserver=None, name='', block_size=50, timeout=10, packet=None): | |
self._item = packet # A packet - the set of which we are trying to coordinate | |
self._rserver = rserver # The redis connection | |
self._block_size = block_size # the block size of the list to coordinate on | |
self._base_name = name # The list name to use in redis | |
self._timeout = timeout # the timeout at which point exit is called | |
# Setup the internals of this method | |
self._list_name = '%s.list' % self._base_name | |
self._backups_set_name = '%s.set' % self._base_name | |
self._my_backup_list_name = None | |
def __enter__(self ): | |
self._safe_lpush_item_rpop_range() | |
self.timeout = gevent.Timeout(self._timeout) | |
self.timeout.start() | |
# Start the time out here... | |
return self | |
def __exit__(self, type, value, traceback): | |
# cleanup and deal with any exception thrown during processing | |
self.timeout.cancel() | |
if self._my_backup_list_name is not None: | |
if value is None: | |
### No exception raised - remove the backup list of values we just processed | |
#print '(ID:%s) EXIT: Cleaning up my backup list from set: %s' % (self._id, self._my_backup_list_name) | |
self._rserver.delete(self._my_backup_list_name) | |
else: | |
### Exception raised - tell others about the backup list and abort! | |
#print '(ID:%s) EXIT: Raise exception %s; Saving backup list name: %s' % (self._id, value, self._my_backup_list_name) | |
self._rserver.sadd(self._backups_set_name, self._my_backup_list_name) | |
def _safe_lpush_item_rpop_range(self): | |
### Coordination to get the range when the list is full goes here | |
######### | |
# An error or network interruption occurring within the following block of code can cause problems! | |
# ===================================== | |
current_length = self._rserver.lpush(self._list_name, self._item) | |
### If we added the item that fills a block size, take those values and process them | |
if current_length % self._block_size == 0: | |
queue_name = str(uuid4())[:13] | |
self._my_backup_list_name = '%s.%s.list' % (self._base_name, queue_name) | |
with self._rserver.pipeline() as pipe: | |
pipe.multi() | |
for i in xrange(self._block_size): | |
pipe.rpoplpush(self._list_name, self._my_backup_list_name) | |
self.extend(pipe.execute()) | |
# ===================================== | |
else: | |
### The main list is not full, check for backup lists that need to be handled due to previous failure. | |
self._my_backup_list_name = self._rserver.spop(self._backups_set_name) | |
if self._my_backup_list_name: | |
self.extend(self._rserver.lrange(self._my_backup_list_name, 0, -1)) | |
#print '(ID:%s) Got backup list name: %s; contents: %s' % (self._id, self._my_backup_list_name, self) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
@author David Stuebe <dstuebe@asascience.com> | |
@author Luke Campbell | |
@author Swarbhanu Chatterjee | |
Runs a single instance of a redis coordination test using a fixed name and known coordination set. Run in a distributed | |
test using subprocess_redis.py | |
To Run: | |
bin/python subprocess_redis.py | |
""" | |
from gevent.monkey import patch_all | |
patch_all() | |
import gevent | |
import redis | |
import uuid | |
from redis_coordination import RedisCoordination | |
import random | |
COMPARE_SET = 'compareset' # Key name for the set which holds the truth about all data in the test | |
def proc(points, block_size, timeout, flag): | |
# Create a redis connection | |
conn = redis.StrictRedis('localhost', db=0) | |
for i in xrange(points): | |
# Create some data for the test | |
datum = str(uuid.uuid4()) | |
# Add it to redis as the authoritative set of data for the test | |
conn.sadd(COMPARE_SET, datum) | |
# Use the Redis Coordination as a WITH statement inside a try except block so that we can intentionally raise | |
# exceptions as part of the test harness | |
try: | |
# Create the with statement passing in the redis connection and other paramters | |
# name should be a known string shared between instances such as a stream_id or dataset_id | |
with RedisCoordination(rserver=conn, block_size=block_size, name='foo', timeout=timeout, packet=datum) as coordinator: | |
# The RedisCoordination class inherits from list. If a block size list of values is available for processing | |
# they will be returned in the coordinator class which you can treat as a list. | |
### | |
# Do processing of the data here! | |
### | |
# Generate a random value to trip an exception for test purposes | |
salt = random.normalvariate(mu=0,sigma=1) | |
if salt < -2.0: | |
raise RuntimeError('Error raised at random as part of the test harness') | |
#elif salt > 2.0: | |
# time.sleep(1.0) | |
# a timeout here is just another kind of exception... but it makes the test really slow | |
for item in coordinator: | |
# As part of the test harness remove each value in the coordinator from the set of Truth in redis | |
# if the test succeeds redis will be empty when the test is complete. | |
result = conn.srem(COMPARE_SET, item) | |
if not result: | |
raise KeyError('ERROR IN COMPARISON!!!! Tried to remove data that was not there!') | |
except RuntimeError: | |
# Keep running the test...... | |
pass | |
if flag: | |
# Run a few more block sizes without raising errors to make sure that any previous test failures can be cleaned up. | |
for i in xrange(block_size*2): | |
datum = str(uuid.uuid4()) | |
conn.sadd(COMPARE_SET, datum) | |
with RedisCoordination(rserver=conn, block_size=block_size, name='foo', timeout=timeout, packet=datum) as coordinator: | |
for item in coordinator: | |
result = conn.srem(COMPARE_SET, item) | |
if not result: | |
raise KeyError('ERROR IN COMPARISON!!!! Tried to remove data that was not there!') | |
if __name__ == '__main__': | |
import sys | |
block_size = 15 | |
data_size = block_size * 800 | |
flag = None | |
if len(sys.argv) > 1: | |
flag = sys.argv[1] | |
print 'Got Flag: "%s"' % flag | |
g = gevent.Greenlet(proc, points=data_size, block_size=block_size, timeout=0.4, flag=flag) | |
g.start() | |
gevent.joinall([g,]) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
@author David Stuebe <dstuebe@asascience.com> | |
@author Luke Campbell | |
@author Swarbhanu Chatterjee | |
Runs pool_size # of test cases of the redis_coordinator_test at one time demonstrating distributed state between them. | |
To Run: | |
bin/python subprocess_redis.py | |
""" | |
import subprocess | |
if __name__ == '__main__': | |
subprocess_pool = list() | |
open_files = list() | |
pool_size = 20 | |
for i in xrange(pool_size): | |
print 'starting subprocess %d' % i | |
job = 'bin/python' | |
arg = 'redis_coordinator_test.py' | |
proc = [job, arg] | |
if i==1: | |
proc.append('1') | |
f = open('/tmp/g%s'%i, 'w') | |
out = f.fileno() | |
open_files.append(f) | |
subprocess_pool.append(subprocess.Popen(proc, shell=False, stdout=out, stderr=out)) | |
for process in subprocess_pool: | |
process.wait() | |
for f in open_files: | |
f.close() | |
print "TADA!" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment