Skip to content

Instantly share code, notes, and snippets.

@nvanderw
Last active May 7, 2022 08:58
Show Gist options
  • Save nvanderw/cee7e7a844c348a13d68e074d554a5ec to your computer and use it in GitHub Desktop.
Save nvanderw/cee7e7a844c348a13d68e074d554a5ec to your computer and use it in GitHub Desktop.
IPC using only Unix/POSIX signals
import os
import signal
import sys
import time
import random
NUM_ITERATIONS = 10000000
NUM_WORKERS = 20
def unpack_bits(bs):
for b in bs:
for i in range(7, -1, -1):
yield (b >> i) & 1
def pack_bits(bits):
i = 0
b = 0
for bit in bits:
b |= bit << (7 - i)
if i == 7:
yield b
i = 0
b = 0
else:
i += 1
bits_to_send = None
bits_cursor = 0
bits_received = []
child_pids = []
child_index = 0
grand_total = 0
parent_pid = os.getpid()
def handler(signum, frame):
global child_pids, child_index, bits_to_send, bits_cursor, bits_received, grand_total
if len(child_pids) == 0: # child process being called on; send next bit to parent
os.kill(parent_pid, signal.SIGUSR1 if bits_to_send[bits_cursor] == 0 else signal.SIGUSR2)
bits_cursor += 1
if bits_cursor >= len(bits_to_send):
sys.exit(0)
else: # parent
if signum == signal.SIGUSR1:
bits_received.append(0)
if (len(bits_received) % 8) == 0 and all(b == 0 for b in bits_received[-8:]):
child_result = int(bytes(pack_bits(bits_received)).decode()[:-1])
grand_total += child_result
child_index += 1
if child_index >= len(child_pids):
print(f"Num processes: {NUM_WORKERS}, total iterations: {NUM_ITERATIONS * NUM_WORKERS}, pi ~= {4 * grand_total / (NUM_ITERATIONS * NUM_WORKERS)}")
sys.exit(0)
bits_received.clear()
else:
bits_received.append(1)
# Send ack back to child
os.kill(child_pids[child_index], signal.SIGUSR1)
signal.signal(signal.SIGUSR1, handler)
signal.signal(signal.SIGUSR2, handler)
# Block USR1 and USR2 for now -- the child processes will unblock them when they are ready to be called on
signal.pthread_sigmask(signal.SIG_BLOCK, [signal.SIGUSR1, signal.SIGUSR2])
for worker in range(NUM_WORKERS):
child_pid = os.fork()
if child_pid == 0:
# child logic
child_pids.clear()
result = 0
for i in range(NUM_ITERATIONS):
x, y = random.random(), random.random()
if x * x + y * y < 1:
result += 1
bits_to_send = [*unpack_bits(str(result).encode() + b'\0')]
# We're ready for the parent process to call on us -- unblock the signals and wait to get called on for the answer.
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGUSR1, signal.SIGUSR2])
while True: time.sleep(1)
child_pids.append(child_pid)
# Parent thread: unblock USR1/USR2 and call on first child
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signal.SIGUSR1, signal.SIGUSR2])
os.kill(child_pids[0], signal.SIGUSR1)
while True: time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment