Created
February 12, 2019 07:23
-
-
Save jtallieu/154d49b0aa1fa85970983bcb13e243a9 to your computer and use it in GitHub Desktop.
Weird variation of Read-Write Lock using gevent. One function, on being called must lock out any other related function for a specific time.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Imagine a situation where you have two functions | |
that operate on the same hashable resource (it has an ID). | |
And that those two functions can be called from various | |
threads that you have no control over. Each of these functions | |
can run multiple times and concurrently. Once one of the | |
functions for a resource is called, neither of the functions | |
can be allowed to run. | |
In specific terms, I have post_save and post_delete Django | |
signals handlers that sync data to an external DB. In our web | |
application, an object may be saved multiple times in | |
concurrent background greenlets on certain endpoints. One | |
of those greenlets may encounter an error that attempts to | |
cleanup. One greenlet is in the middle of building the record | |
to save, while another greenlet calls and completes a delete. | |
The save greenlet writes to external database and BAM, we have | |
phantom items in the external database. | |
My solution is to implement a mechanism that mimics Read-Write | |
Locks with Strong Writer Preference, but it's actually a strong | |
Deleter Preference with a slight nuance. Once a delete occurs, | |
any subsequent signals on that instance should be prevented. | |
Because of the nature of the environment (we reuse id's), I need | |
to hold this condition for a while and then reset. | |
""" | |
import gevent | |
from gevent import monkey | |
monkey.patch_all() | |
import logging | |
FORMAT = '%(asctime)s.%(msecs).03d[%(levelname)0.3s] %(funcName)14.14s %(message)s' | |
logging.basicConfig(level=logging.INFO, format=FORMAT, datefmt='%H:%M:%S') | |
log = logging.getLogger("main") | |
import random | |
class OxLock(object): | |
def __init__(self): | |
self.saves = 0 | |
self.save_lock = gevent.lock.Semaphore(1) | |
self.save_exit = gevent.event.Event() | |
self.auto_release = None | |
class OxLocks(object): | |
""" | |
Specialized lock to prevent any subsequent PG->LDAP syncs once | |
a deleted has been called in the same process. Once a delete event | |
is processed, any other sync events are ignored for a specified time | |
when the auto_release fires. Based on the read-write lock with strong | |
reader preference. | |
OxLocks is a collection of Oxpeck Sync locks indexed by some id. | |
""" | |
def __init__(self, max_lease=300, stats=600): | |
self.locks = {} | |
# how long in seconds to keep save_lock | |
self.max_lease = max_lease | |
self.stats_period = stats | |
self.stats_timer = gevent.spawn_later(self.stats_period, self.stats) | |
def stats(self): | |
"""Report stats about locks""" | |
try: | |
lock = dict(total=len(self.locks), disabled=0, active=0) | |
for key, slock in self.locks.iteritems(): | |
if slock.save_lock.locked() and not slock.saves: | |
lock['disabled'] += 1 | |
if slock.saves: | |
lock['active'] += 1 | |
log.critical("locks:{total}, active:{active}, disabled:{disabled}".format(**lock)) | |
finally: | |
if gevent.getcurrent() == self.stats_timer: | |
self.stats_timer = gevent.spawn_later(self.stats_period, self.stats) | |
def acquire_save(self, name): | |
slock = self.locks.setdefault(name, OxLock()) | |
# Lock will be taken if a delete happened, the decision to not | |
# block simplifies the releasing of the lock. We don't have to | |
# deal with readers waiting for delete to complete. We can just | |
# remove the lock | |
if slock.save_lock.acquire(blocking=False): | |
slock.saves += 1 | |
slock.save_lock.release() | |
return True | |
return False | |
def release_save(self, name): | |
# Cooperative routines make this too simple. No need get | |
# lock to decrement counter. I know I won't be pre-empted | |
if name in self.locks: | |
slock = self.locks[name] | |
slock.saves -= 1 | |
slock.save_exit.set() | |
def acquire_delete(self, name): | |
slock = self.locks.setdefault(name, OxLock()) | |
# Lock out other readers and deleters and never release | |
if slock.save_lock.acquire(blocking=False): | |
try: | |
# Keep checking for no readers | |
while slock.saves > 0: | |
# Every save release signals the exit so we can check | |
# if all the readers are done. | |
slock.save_exit.clear() | |
slock.save_exit.wait() | |
slock.auto_release = gevent.spawn_later(self.max_lease, self.release_delete, name) | |
return True | |
except Exception: | |
log.exception("Exception while holding lock for ({}) - locked forever".format(name)) | |
return False | |
def release_delete(self, name): | |
log.critical("clearing lock ({}) after {}s".format(name, self.max_lease)) | |
self.locks.pop(name, None) | |
SLOCKS = OxLocks(max_lease=10, stats=1) | |
# My functions that need synchronization | |
def random_nap(grn, name): | |
gevent.sleep(random.randint(0, 6)) | |
def post_save(name): | |
tid = str(hex(id(gevent.getcurrent()))) | |
if SLOCKS.acquire_save(name): | |
log.info("running {}:{}".format(tid, name)) | |
random_nap(tid, name) | |
SLOCKS.release_save(name) | |
def post_delete(name): | |
tid = str(hex(id(gevent.getcurrent()))) | |
if SLOCKS.acquire_delete(name): | |
log.info("RUNNING {}:{}".format(tid, name)) | |
random_nap(tid, name) | |
def launch(count): | |
for x in range(0, 3): | |
glets = [] | |
for i in range(0, count): | |
name = random.choice(["cafeface", "deadbeef", "fadedace"]) | |
func = random.choice([post_delete] + [post_save] * 5) | |
glets.append(gevent.spawn(func, name)) | |
log.info("->>>>>>>>> started {}".format(func.__name__)) | |
gevent.sleep(random.uniform(0.1, 2)) | |
gevent.wait(glets) | |
gevent.sleep(30) | |
if __name__ == "__main__": | |
launch(30) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment