Last active
January 17, 2020 16:53
-
-
Save gdamjan/a43117d56c71d6a63c334b6e79b4dc7b to your computer and use it in GitHub Desktop.
python multi-process semaphore
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
a multi-process quasy semaphore | |
multiple processes start at mostly the same time, one becomes the leader | |
does some initialization, then all of them start doing some work | |
at the end, the leader does the cleanup. | |
TODO: make it a proper python context manager | |
''' | |
import os, fcntl, time | |
LOCKFILE = '/tmp/demo-lock' | |
def create_sem(): | |
try: | |
# leader | |
fd = os.open(LOCKFILE, os.O_CREAT|os.O_EXCL|os.O_RDWR, mode=0o600) | |
# there's a possible race condition between these two | |
fcntl.lockf(fd, fcntl.LOCK_EX) | |
return fd, True | |
except FileExistsError: | |
# worker bees | |
fd = os.open(LOCKFILE, os.O_RDWR) | |
fcntl.lockf(fd, fcntl.LOCK_SH) | |
return fd, False | |
def remove_sem(fd): | |
os.close(fd) | |
os.unlink(LOCKFILE) | |
def wait_all_finished(fd): | |
# can't get the exclusive lock unless all the others are released | |
while True: | |
try: | |
fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB) | |
return | |
except BlockingIOError: | |
time.sleep(0.5) | |
def unlock_sem(fd): | |
fcntl.lockf(fd, fcntl.LOCK_UN) | |
### TEST simulation | |
def do_work(identity): | |
for i in range(20): | |
print(f'{identity}: doing work') | |
time.sleep(0.2) | |
### process entry point, choose a leader, all do work, the leader cleans up | |
def test_process(identity): | |
fd, leader = create_sem() | |
if leader: | |
# leader | |
print(f"I'm the leader: {identity}… initialization here") | |
time.sleep(1) | |
unlock_sem(fd) | |
do_work(identity) | |
wait_all_finished(fd) | |
print("leader: finalize() here") | |
remove_sem(fd) | |
print(f"leader ends") | |
else: | |
# worker | |
print(f"worker: {identity}") | |
do_work(identity) | |
unlock_sem(fd) | |
print(f"worker<{identity}> ends.") | |
### fork additional 5 processes, | |
### note that they don't share the file descriptor, or any other resource | |
def main(): | |
for n in range(1, 6): | |
if os.fork() == 0: | |
test_process(n) | |
return | |
test_process(0) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
import time | |
from collections import namedtuple | |
class ThreadLeader(): | |
Worker = namedtuple('Worker', ['done']) | |
Leader = namedtuple('Leader', ['releaseworkers', 'wait']) | |
def __init__(self): | |
self._workers = 0 | |
self._workers_lock = threading.Lock() | |
self._leader_lock = threading.Lock() | |
self._ev = threading.Event() | |
def choose(self): | |
leader = self._leader_lock.acquire(blocking=False) | |
if leader: | |
return self.Leader(wait=lambda: self.wait(), releaseworkers=lambda: self.releaseworkers()), None | |
else: | |
with self._workers_lock: | |
self._workers += 1 | |
self._ev.wait() | |
return None, self.Worker(done=lambda: self.done()) | |
def releaseworkers(self): | |
self._ev.set() | |
def wait(self): | |
while self._workers > 0: | |
time.sleep(1) | |
def done(self): | |
with self._workers_lock: | |
self._workers -= 1 | |
### Example ### | |
# | |
# the below code is an example of a module that could be run by several threads | |
# where we don't control how the threads were started, but they need to choose a | |
# "leader" to do some initialization and finalization work. threads will share the | |
# module scoped `threadleader` object | |
# | |
threadleader = ThreadLeader() | |
def parallel_work(identity): | |
leader, worker = threadleader.choose() | |
if leader: | |
print('Leader: initializing') | |
time.sleep(2) | |
print('Leader: initializing done!') | |
leader.releaseworkers() | |
for i in range(10): | |
print('Leader doing work') | |
time.sleep(1) | |
print('Leader: wait for workers:') | |
leader.wait() | |
print('Leader: Finalize!') | |
else: | |
# init stuff has already happened | |
for i in range(10): | |
print(f'Worker<{identity}>: doing work') | |
time.sleep(1) | |
worker.done() | |
if __name__ == '__main__': | |
for n in range(1, 6): | |
threading.Thread(target=parallel_work, args=(n,)).start() | |
parallel_work(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment