Created
November 22, 2016 18:13
-
-
Save EliAndrewC/2b23aa2bfb83b353abe5f64f0646c568 to your computer and use it in GitHub Desktop.
Script to test the new sideboard.lib.RWGuard class
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import unicode_literals, print_function | |
import re | |
import random | |
import traceback | |
from functools import wraps | |
from datetime import datetime | |
from threading import Event, Thread, current_thread | |
from six.moves.queue import Queue, Empty | |
from sideboard.lib import RWGuard | |
logs = Queue() | |
stopped = Event() | |
guard = RWGuard() | |
words = [word.strip() for word in open('/usr/share/dict/words') if re.match('^[a-z]{3,10}$', word)] | |
def log(*message): | |
logs.put(message) | |
def logger(): | |
while logs.qsize() or not stopped.is_set(): | |
try: | |
message = logs.get(timeout=1) | |
except Empty: | |
pass | |
else: | |
prefix = ['LOGGED WHILE SHUTTING DOWN:'] if stopped.is_set() else [] | |
print(datetime.now(), *prefix + list(message)) | |
def stop_on_error(func): | |
@wraps(func) | |
def with_error_checking(): | |
try: | |
func() | |
except: | |
stopped.set() | |
log(traceback.format_exc()) | |
return with_error_checking | |
@stop_on_error | |
def reader(): | |
name = random.choice(words) | |
wait_for = random.randrange(1, 10) | |
log(name.ljust(10), 'reader about to acquire lock') | |
with guard.read_locked: | |
assert guard.acquired_readers | |
assert not guard.acquired_writer | |
log(name.ljust(10), 'reader acquired lock to work', wait_for, 'seconds alongside', len(guard.acquired_readers), 'others') | |
stopped.wait(wait_for) | |
with guard.read_locked: | |
log(name.ljust(10), 'reader finished, about to release lock twice to ensure reentrancy works') | |
@stop_on_error | |
def writer(): | |
name = random.choice(words) | |
wait_for = random.randrange(3, 9) | |
log(name.ljust(10), 'writer about to acquire lock, number of other writers waiting:', guard.waiting_writer_count) | |
with guard.write_locked: | |
assert not guard.acquired_readers | |
assert dict(guard.acquired_writer) == {current_thread().ident: 1} | |
log(name.ljust(10), 'writer acquired lock, working for', wait_for, 'seconds') | |
stopped.wait(wait_for) | |
with guard.write_locked: | |
log(name.ljust(10), 'writer releasing lock, number of other writers waiting:', guard.waiting_writer_count) | |
if __name__ == '__main__': | |
Thread(target=logger).start() | |
try: | |
while not stopped.is_set(): | |
stopped.wait(0.1) | |
if random.randrange(100): | |
Thread(target=reader).start() | |
else: | |
Thread(target=writer).start() | |
except KeyboardInterrupt: | |
print('Ctrl-c: shutting down') | |
stopped.set() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment