Last active
July 18, 2019 04:32
-
-
Save wecsam/2a30313a7fd668279aca865174c86137 to your computer and use it in GitHub Desktop.
Basic IPC demo with non-parent-child relationship in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
''' | |
This script demonstrates some basic inter-process communication. Typical usage: | |
1. Run this script with the "make" argument. On Windows, a new console window | |
will appear; that is the worker. Copy the last line of the output from this | |
window to the clipboard. | |
2. Run this script with the "query" argument and paste the last line of the | |
output from the last step as the second argument. Repeat this step as many | |
times as you want. After the worker finishes, repeating this step will cause | |
the worker to quit. That is the end of the demonstration. | |
Python 3.6 or later is required. | |
''' | |
import base64, multiprocessing.connection, pickle, subprocess, secrets, sys | |
import threading, time # only needed by Worker | |
class Worker: | |
''' | |
This class continually recomputes the time since it was initialized. | |
''' | |
def __init__(self): | |
self._init_time = time.time() | |
self._stop = False | |
self._output = "" | |
threading.Thread(target=self._work).start() | |
def _work(self): | |
while not self.done: | |
self._output = "It has been {} seconds.".format( | |
self.timedelta_since_init | |
) | |
time.sleep(0.2) | |
self._output = "This worker is finished." | |
@property | |
def output(self): | |
return self._output | |
@property | |
def timedelta_since_init(self): | |
return time.time() - self._init_time | |
@property | |
def done(self): | |
# We consider this worker finished after an arbitrary amount of time. | |
return self.timedelta_since_init >= 60.0 | |
def stop(self): | |
self._stop = True | |
class _CommandHandlers: | |
@staticmethod | |
def _print_no_more_args(): | |
print("No additional arguments were expected.", file=sys.stderr) | |
@classmethod | |
def help(cls, argv): | |
''' | |
This command shows this message and exits. | |
''' | |
return True | |
@classmethod | |
def make(cls, argv): | |
''' | |
This command starts a new worker in a new process. | |
''' | |
if argv: | |
cls._print_no_more_args() | |
return True | |
# Start a worker in a new process. | |
spawn = subprocess.Popen( | |
[sys.executable, "-u", sys.argv[0], "work"], | |
shell=False, | |
stdout=subprocess.PIPE, | |
# stderr=subprocess.PIPE, | |
bufsize=1, | |
encoding=sys.stdout.encoding, | |
creationflags=getattr(subprocess, "CREATE_NEW_CONSOLE", 0) | |
) | |
print("Started PID:", spawn.pid) | |
# Receive the encoded arguments for the | |
# multiprocessing.connection.Client constructor. | |
stdout_data = spawn.stdout.readline() | |
if stdout_data: | |
print("Pass the following to the query command:", file=sys.stderr) | |
sys.stdout.write(stdout_data) | |
else: | |
print( | |
"The worker died before we could get its communication " | |
"info.", | |
file=sys.stderr | |
) | |
# sys.stderr.write(spawn.stderr.read()) | |
return True | |
return False | |
@classmethod | |
def query(cls, argv): | |
''' | |
This command queries the status of a worker. | |
''' | |
if len(argv) != 1: | |
print( | |
"One argument, the last line of the output from the make " | |
"command, was expected.", | |
file=sys.stderr | |
) | |
return True | |
try: | |
conn = multiprocessing.connection.Client( | |
**pickle.loads(base64.b64decode(argv[0])) | |
) | |
except multiprocessing.AuthenticationError: | |
print("The authentication key is incorrect.", file=sys.stderr) | |
except (FileNotFoundError, ConnectionRefusedError): | |
print("The worker has died.", file=sys.stderr) | |
else: | |
with conn: | |
print(conn.recv()) | |
return False | |
@classmethod | |
def work(cls, argv): | |
''' | |
This command makes this process a worker. | |
''' | |
if argv: | |
cls._print_no_more_args() | |
return True | |
# Generate a secure token for the authentication key. | |
authkey = secrets.token_bytes() | |
# Start the listener. | |
with multiprocessing.connection.Listener(authkey=authkey) as listener: | |
# Pass the listener address back to the make() function. | |
print( | |
base64.b64encode( | |
pickle.dumps( | |
{ | |
"address": listener.address, | |
"authkey": authkey | |
} | |
) | |
).decode("UTF-8") | |
) | |
# Start the worker. | |
worker = Worker() | |
# Accept connections. | |
while True: | |
try: | |
conn = listener.accept() | |
except multiprocessing.AuthenticationError as e: | |
print(e, file=sys.stderr) | |
else: | |
with conn: | |
conn.send(worker.output) | |
if worker.done: | |
break | |
return False | |
@classmethod | |
def _available_commands(cls): | |
for attr in dir(cls): | |
if not attr.startswith("_"): | |
yield attr | |
def main(): | |
show_help = True | |
try: | |
command = sys.argv[1] | |
except IndexError: | |
print("Enter a command.", file=sys.stderr) | |
else: | |
try: | |
command_handler = getattr(_CommandHandlers, command) | |
except AttributeError: | |
print("The command was invalid.", file=sys.stderr) | |
else: | |
show_help = command_handler(sys.argv[2:]) | |
if show_help: | |
print( | |
"\n" | |
"This script demonstrates some basic inter-process communication." | |
) | |
commands = sorted(_CommandHandlers._available_commands()) | |
print( | |
"Usage:", | |
sys.argv[0], | |
"[{}]".format( | |
"|".join( | |
attr for attr in commands | |
) | |
), | |
end="\n\n" | |
) | |
max_command_length = max(len(command) for command in commands) | |
print("Commands:") | |
for command in commands: | |
doc = getattr(_CommandHandlers, command).__doc__ | |
if isinstance(doc, str): | |
doc = doc.strip() | |
if doc: | |
print(" -", command.ljust(max_command_length), "-", doc) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment