Skip to content

Instantly share code, notes, and snippets.

@jtallieu
Created February 12, 2019 07:23
Show Gist options
  • Save jtallieu/154d49b0aa1fa85970983bcb13e243a9 to your computer and use it in GitHub Desktop.
Save jtallieu/154d49b0aa1fa85970983bcb13e243a9 to your computer and use it in GitHub Desktop.
Weird variation of Read-Write Lock using gevent. One function, on being called must lock out any other related function for a specific time.
"""
Imagine a situation where you have two functions
that operate on the same hashable resource (it has an ID).
And that those two functions can be called from various
threads that you have no control over. Each of these functions
can run multiple times and concurrently. Once one of the
functions for a resource is called, neither of the functions
can be allowed to run.
In specific terms, I have post_save and post_delete Django
signals handlers that sync data to an external DB. In our web
application, an object may be saved multiple times in
concurrent background greenlets on certain endpoints. One
of those greenlets may encounter an error that attempts to
cleanup. One greenlet is in the middle of building the record
to save, while another greenlet calls and completes a delete.
The save greenlet writes to external database and BAM, we have
phantom items in the external database.
My solution is to implement a mechanism that mimics Read-Write
Locks with Strong Writer Preference, but it's actually a strong
Deleter Preference with a slight nuance. Once a delete occurs,
any subsequent signals on that instance should be prevented.
Because of the nature of the environment (we reuse id's), I need
to hold this condition for a while and then reset.
"""
import gevent
from gevent import monkey
monkey.patch_all()
import logging
FORMAT = '%(asctime)s.%(msecs).03d[%(levelname)0.3s] %(funcName)14.14s %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT, datefmt='%H:%M:%S')
log = logging.getLogger("main")
import random
class OxLock(object):
def __init__(self):
self.saves = 0
self.save_lock = gevent.lock.Semaphore(1)
self.save_exit = gevent.event.Event()
self.auto_release = None
class OxLocks(object):
"""
Specialized lock to prevent any subsequent PG->LDAP syncs once
a deleted has been called in the same process. Once a delete event
is processed, any other sync events are ignored for a specified time
when the auto_release fires. Based on the read-write lock with strong
reader preference.
OxLocks is a collection of Oxpeck Sync locks indexed by some id.
"""
def __init__(self, max_lease=300, stats=600):
self.locks = {}
# how long in seconds to keep save_lock
self.max_lease = max_lease
self.stats_period = stats
self.stats_timer = gevent.spawn_later(self.stats_period, self.stats)
def stats(self):
"""Report stats about locks"""
try:
lock = dict(total=len(self.locks), disabled=0, active=0)
for key, slock in self.locks.iteritems():
if slock.save_lock.locked() and not slock.saves:
lock['disabled'] += 1
if slock.saves:
lock['active'] += 1
log.critical("locks:{total}, active:{active}, disabled:{disabled}".format(**lock))
finally:
if gevent.getcurrent() == self.stats_timer:
self.stats_timer = gevent.spawn_later(self.stats_period, self.stats)
def acquire_save(self, name):
slock = self.locks.setdefault(name, OxLock())
# Lock will be taken if a delete happened, the decision to not
# block simplifies the releasing of the lock. We don't have to
# deal with readers waiting for delete to complete. We can just
# remove the lock
if slock.save_lock.acquire(blocking=False):
slock.saves += 1
slock.save_lock.release()
return True
return False
def release_save(self, name):
# Cooperative routines make this too simple. No need get
# lock to decrement counter. I know I won't be pre-empted
if name in self.locks:
slock = self.locks[name]
slock.saves -= 1
slock.save_exit.set()
def acquire_delete(self, name):
slock = self.locks.setdefault(name, OxLock())
# Lock out other readers and deleters and never release
if slock.save_lock.acquire(blocking=False):
try:
# Keep checking for no readers
while slock.saves > 0:
# Every save release signals the exit so we can check
# if all the readers are done.
slock.save_exit.clear()
slock.save_exit.wait()
slock.auto_release = gevent.spawn_later(self.max_lease, self.release_delete, name)
return True
except Exception:
log.exception("Exception while holding lock for ({}) - locked forever".format(name))
return False
def release_delete(self, name):
log.critical("clearing lock ({}) after {}s".format(name, self.max_lease))
self.locks.pop(name, None)
SLOCKS = OxLocks(max_lease=10, stats=1)
# My functions that need synchronization
def random_nap(grn, name):
gevent.sleep(random.randint(0, 6))
def post_save(name):
tid = str(hex(id(gevent.getcurrent())))
if SLOCKS.acquire_save(name):
log.info("running {}:{}".format(tid, name))
random_nap(tid, name)
SLOCKS.release_save(name)
def post_delete(name):
tid = str(hex(id(gevent.getcurrent())))
if SLOCKS.acquire_delete(name):
log.info("RUNNING {}:{}".format(tid, name))
random_nap(tid, name)
def launch(count):
for x in range(0, 3):
glets = []
for i in range(0, count):
name = random.choice(["cafeface", "deadbeef", "fadedace"])
func = random.choice([post_delete] + [post_save] * 5)
glets.append(gevent.spawn(func, name))
log.info("->>>>>>>>> started {}".format(func.__name__))
gevent.sleep(random.uniform(0.1, 2))
gevent.wait(glets)
gevent.sleep(30)
if __name__ == "__main__":
launch(30)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment