Skip to content

Instantly share code, notes, and snippets.

@wecsam
Last active July 18, 2019 04:32
Show Gist options
  • Save wecsam/2a30313a7fd668279aca865174c86137 to your computer and use it in GitHub Desktop.
Save wecsam/2a30313a7fd668279aca865174c86137 to your computer and use it in GitHub Desktop.
Basic IPC demo with non-parent-child relationship in Python
#!/usr/bin/env python3
'''
This script demonstrates some basic inter-process communication. Typical usage:
1. Run this script with the "make" argument. On Windows, a new console window
will appear; that is the worker. Copy the last line of the output from this
window to the clipboard.
2. Run this script with the "query" argument and paste the last line of the
output from the last step as the second argument. Repeat this step as many
times as you want. After the worker finishes, repeating this step will cause
the worker to quit. That is the end of the demonstration.
Python 3.6 or later is required.
'''
import base64, multiprocessing.connection, pickle, subprocess, secrets, sys
import threading, time # only needed by Worker
class Worker:
'''
This class continually recomputes the time since it was initialized.
'''
def __init__(self):
self._init_time = time.time()
self._stop = False
self._output = ""
threading.Thread(target=self._work).start()
def _work(self):
while not self.done:
self._output = "It has been {} seconds.".format(
self.timedelta_since_init
)
time.sleep(0.2)
self._output = "This worker is finished."
@property
def output(self):
return self._output
@property
def timedelta_since_init(self):
return time.time() - self._init_time
@property
def done(self):
# We consider this worker finished after an arbitrary amount of time.
return self.timedelta_since_init >= 60.0
def stop(self):
self._stop = True
class _CommandHandlers:
@staticmethod
def _print_no_more_args():
print("No additional arguments were expected.", file=sys.stderr)
@classmethod
def help(cls, argv):
'''
This command shows this message and exits.
'''
return True
@classmethod
def make(cls, argv):
'''
This command starts a new worker in a new process.
'''
if argv:
cls._print_no_more_args()
return True
# Start a worker in a new process.
spawn = subprocess.Popen(
[sys.executable, "-u", sys.argv[0], "work"],
shell=False,
stdout=subprocess.PIPE,
# stderr=subprocess.PIPE,
bufsize=1,
encoding=sys.stdout.encoding,
creationflags=getattr(subprocess, "CREATE_NEW_CONSOLE", 0)
)
print("Started PID:", spawn.pid)
# Receive the encoded arguments for the
# multiprocessing.connection.Client constructor.
stdout_data = spawn.stdout.readline()
if stdout_data:
print("Pass the following to the query command:", file=sys.stderr)
sys.stdout.write(stdout_data)
else:
print(
"The worker died before we could get its communication "
"info.",
file=sys.stderr
)
# sys.stderr.write(spawn.stderr.read())
return True
return False
@classmethod
def query(cls, argv):
'''
This command queries the status of a worker.
'''
if len(argv) != 1:
print(
"One argument, the last line of the output from the make "
"command, was expected.",
file=sys.stderr
)
return True
try:
conn = multiprocessing.connection.Client(
**pickle.loads(base64.b64decode(argv[0]))
)
except multiprocessing.AuthenticationError:
print("The authentication key is incorrect.", file=sys.stderr)
except (FileNotFoundError, ConnectionRefusedError):
print("The worker has died.", file=sys.stderr)
else:
with conn:
print(conn.recv())
return False
@classmethod
def work(cls, argv):
'''
This command makes this process a worker.
'''
if argv:
cls._print_no_more_args()
return True
# Generate a secure token for the authentication key.
authkey = secrets.token_bytes()
# Start the listener.
with multiprocessing.connection.Listener(authkey=authkey) as listener:
# Pass the listener address back to the make() function.
print(
base64.b64encode(
pickle.dumps(
{
"address": listener.address,
"authkey": authkey
}
)
).decode("UTF-8")
)
# Start the worker.
worker = Worker()
# Accept connections.
while True:
try:
conn = listener.accept()
except multiprocessing.AuthenticationError as e:
print(e, file=sys.stderr)
else:
with conn:
conn.send(worker.output)
if worker.done:
break
return False
@classmethod
def _available_commands(cls):
for attr in dir(cls):
if not attr.startswith("_"):
yield attr
def main():
show_help = True
try:
command = sys.argv[1]
except IndexError:
print("Enter a command.", file=sys.stderr)
else:
try:
command_handler = getattr(_CommandHandlers, command)
except AttributeError:
print("The command was invalid.", file=sys.stderr)
else:
show_help = command_handler(sys.argv[2:])
if show_help:
print(
"\n"
"This script demonstrates some basic inter-process communication."
)
commands = sorted(_CommandHandlers._available_commands())
print(
"Usage:",
sys.argv[0],
"[{}]".format(
"|".join(
attr for attr in commands
)
),
end="\n\n"
)
max_command_length = max(len(command) for command in commands)
print("Commands:")
for command in commands:
doc = getattr(_CommandHandlers, command).__doc__
if isinstance(doc, str):
doc = doc.strip()
if doc:
print(" -", command.ljust(max_command_length), "-", doc)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment