Skip to content

Instantly share code, notes, and snippets.

@gdamjan
Last active January 17, 2020 16:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gdamjan/a43117d56c71d6a63c334b6e79b4dc7b to your computer and use it in GitHub Desktop.
Save gdamjan/a43117d56c71d6a63c334b6e79b4dc7b to your computer and use it in GitHub Desktop.
python multi-process semaphore
'''
a multi-process quasy semaphore
multiple processes start at mostly the same time, one becomes the leader
does some initialization, then all of them start doing some work
at the end, the leader does the cleanup.
TODO: make it a proper python context manager
'''
import os, fcntl, time
LOCKFILE = '/tmp/demo-lock'
def create_sem():
try:
# leader
fd = os.open(LOCKFILE, os.O_CREAT|os.O_EXCL|os.O_RDWR, mode=0o600)
# there's a possible race condition between these two
fcntl.lockf(fd, fcntl.LOCK_EX)
return fd, True
except FileExistsError:
# worker bees
fd = os.open(LOCKFILE, os.O_RDWR)
fcntl.lockf(fd, fcntl.LOCK_SH)
return fd, False
def remove_sem(fd):
os.close(fd)
os.unlink(LOCKFILE)
def wait_all_finished(fd):
# can't get the exclusive lock unless all the others are released
while True:
try:
fcntl.lockf(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
return
except BlockingIOError:
time.sleep(0.5)
def unlock_sem(fd):
fcntl.lockf(fd, fcntl.LOCK_UN)
### TEST simulation
def do_work(identity):
for i in range(20):
print(f'{identity}: doing work')
time.sleep(0.2)
### process entry point, choose a leader, all do work, the leader cleans up
def test_process(identity):
fd, leader = create_sem()
if leader:
# leader
print(f"I'm the leader: {identity}… initialization here")
time.sleep(1)
unlock_sem(fd)
do_work(identity)
wait_all_finished(fd)
print("leader: finalize() here")
remove_sem(fd)
print(f"leader ends")
else:
# worker
print(f"worker: {identity}")
do_work(identity)
unlock_sem(fd)
print(f"worker<{identity}> ends.")
### fork additional 5 processes,
### note that they don't share the file descriptor, or any other resource
def main():
for n in range(1, 6):
if os.fork() == 0:
test_process(n)
return
test_process(0)
if __name__ == '__main__':
main()
import threading
import time
from collections import namedtuple
class ThreadLeader():
Worker = namedtuple('Worker', ['done'])
Leader = namedtuple('Leader', ['releaseworkers', 'wait'])
def __init__(self):
self._workers = 0
self._workers_lock = threading.Lock()
self._leader_lock = threading.Lock()
self._ev = threading.Event()
def choose(self):
leader = self._leader_lock.acquire(blocking=False)
if leader:
return self.Leader(wait=lambda: self.wait(), releaseworkers=lambda: self.releaseworkers()), None
else:
with self._workers_lock:
self._workers += 1
self._ev.wait()
return None, self.Worker(done=lambda: self.done())
def releaseworkers(self):
self._ev.set()
def wait(self):
while self._workers > 0:
time.sleep(1)
def done(self):
with self._workers_lock:
self._workers -= 1
### Example ###
#
# the below code is an example of a module that could be run by several threads
# where we don't control how the threads were started, but they need to choose a
# "leader" to do some initialization and finalization work. threads will share the
# module scoped `threadleader` object
#
threadleader = ThreadLeader()
def parallel_work(identity):
leader, worker = threadleader.choose()
if leader:
print('Leader: initializing')
time.sleep(2)
print('Leader: initializing done!')
leader.releaseworkers()
for i in range(10):
print('Leader doing work')
time.sleep(1)
print('Leader: wait for workers:')
leader.wait()
print('Leader: Finalize!')
else:
# init stuff has already happened
for i in range(10):
print(f'Worker<{identity}>: doing work')
time.sleep(1)
worker.done()
if __name__ == '__main__':
for n in range(1, 6):
threading.Thread(target=parallel_work, args=(n,)).start()
parallel_work(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment