Skip to content

Instantly share code, notes, and snippets.

@NF1198
Last active August 27, 2023 11:11
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save NF1198/502dca2086825caef82d540f67552090 to your computer and use it in GitHub Desktop.
Save NF1198/502dca2086825caef82d540f67552090 to your computer and use it in GitHub Desktop.
Simultaneously read from multiple subprocesses in Python 3 using {select, threads} (select, epoll, thread, subprocess, PIPE)

Reading from multiple subprocesses in Python: epoll vs threads

The following performance results were generated using a 2300 line file with output piped directly to a file instead of the console. (All awk delays were set to 0 for the benchmark) Processor Intel(R) Core(TM) i7-8550U CPU @ 1.80GHz, 1992 Mhz, 4 Core(s), 8 Logical Processor(s)

The epoll solution is the clear winner, but is only available on Unix and Linux.

Approach 1: epoll

  • real 0m24.954s
  • user 0m4.453s
  • sys 0m38.938s

CPU usage

  • python: <1%
  • gawk: 10-16%
  • gawk: 10-16%

Approach 2: threads (with collections.deque)

  • real 0m50.271s
  • user 0m46.219s
  • sys 1m33.406s

CPU usage:

  • python: 35%
  • gawk: 6%
  • gawk: 6%
#!/usr/bin/python3
"""
Example demonstrating how to simultaneously read from multiple subprocesses
Author: Nicholas Folse
Date: 2020.03.21
Problem:
- How do I simultaneously process output from multiple subprocesses using Python?
- Can I read from multiple subprocesses in parallel using Python?
Possible Solutions:
- Create a thread for each subprocess, then gather results. This solution
would require complex synchronization of any shared data structures.
- Use asyncio. This solution would require Python 3.6+ and introduces the
complexity of using asyncio, which seems like overkill for this problem
- Use selectors. Selectors (Linux only) provide a mechanism to listen for
events on multiple file descriptors with a timeout.
Best Solution (Linux): Selectors
This solution is based on selectors, specifically epoll selectors. The basic
idea is to wait for EPOLL events on multiple input pipes, read the input as
it is generated, then handle the lines appropriately.
To do this efficiently, create implementation(s) of HandledProcess, then call
handle_processes(processes, [timeout]), Where processes is an iterable of
HandledProcess. The handle_processes function will setup epoll and
handle incoming data by calling the .handle_line() method of your implementation(s)
of HandledProcess.
The example below depends on a file "foo.txt" that contains:
Line 1
Line 2
Line 3
Line 4
Two implementations of HandledProcess are defined: SlowAWK and CAT.
SlowAWK calls awk in a way that outputs lines with a delay. CAT simply
outputs all lines as fast as possible.
Three HandledProcess objects are created and passed to handle_processes.
In this example, each line is simply printed to the screen as follows:
('SlowAWK', 'a', 'line 1\n')
('SlowAWK', 'b', 'line 1\n')
('cat', 'c', 'line 1\n')
('cat', 'c', 'line 2\n')
('cat', 'c', 'line 3\n')
('cat', 'c', 'line 4')
('SlowAWK', 'b', 'line 2\n')
('SlowAWK', 'b', 'line 3\n')
('SlowAWK', 'a', 'line 2\n')
('SlowAWK', 'b', 'line 4\n')
('SlowAWK', 'a', 'line 3\n')
('SlowAWK', 'a', 'line 4\n')
[0, 0, 0]
This shows that output of multiple subprocesses can be read in an interleaved
non-blocking manner using only a single thread.
"""
import subprocess
import select
from abc import ABC, abstractmethod
class HandledProcess(ABC):
@abstractmethod
def handle_line(self, line):
return NotImplemented
@property
@abstractmethod
def process(self):
return NotImplemented
@property
def returncode(self):
return self.process.returncode
@property
def stdout(self):
return self.process.stdout
@property
def stdin(self):
return self.process.stdin
class SlowAwkProcess(HandledProcess):
def __init__(self, key, filename, delay):
self._key = key
args = ["awk", "{{print $0; system(\"sleep {}\");}}".format(delay), filename]
self._p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=None)
@property
def process(self):
return self._p
def handle_line(self, line):
print(("SlowAWK", self._key, line.decode()))
class CatProcess(HandledProcess):
def __init__(self, key, filename):
self._key = key
args = ["cat", filename]
self._p = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=None)
@property
def process(self):
return self._p
def handle_line(self, line):
print(("cat", self._key, line.decode()))
def handle_processes(processes, timeout=1):
"""
Accepts input from multiple processes and handles the output, line-by-line
inputs:
processes: iterable of HandledProcess
timeout: how long to wait for events on relevant file desciptors before checking for process completion
returns:
None
"""
fdmap = {p.stdout.name if p.stdout.name else p.stdout: p for p in processes}
proc_list = [p.process for p in fdmap.values()]
try:
ep = select.epoll()
for fd in fdmap.keys():
ep.register(fd, select.EPOLLIN | select.EPOLLHUP)
while None in (p.poll() for p in proc_list):
evts = ep.poll(timeout=timeout)
for fd, evt in evts:
if evt & select.EPOLLIN:
proc = fdmap[fd]
proc.handle_line(proc.stdout.readline())
if evt & select.EPOLLHUP:
proc = fdmap[fd]
for line in proc.stdout:
proc.handle_line(line)
ep.unregister(fd)
finally:
ep.close()
def main():
p1 = SlowAwkProcess("a", "foo.txt", 0.02)
p2 = SlowAwkProcess("b", "foo.txt", 0.01)
p3 = CatProcess("c", "foo.txt")
process_list = [p1, p2, p3]
handle_processes(process_list, timeout=0.05)
print([p.returncode for p in process_list])
if __name__ == "__main__":
main()
#!/usr/bin/python3
"""
Example demonstrating how to simultaneously read from multiple subprocesses (Python 2, 3)
Author: Nicholas Folse
Original Solution By: jfs https://stackoverflow.com/users/4279/jfs
Date: 2020.03.21
Problem:
- How do I simultaneously process output from multiple subprocesses using Python?
- Can I read from multiple subprocesses in parallel using Python?
Possible Solutions:
- Create a thread for each subprocess, then gather results. This solution
would require complex synchronization of any shared data structures.
- Use asyncio. This solution would require Python 3.6+ and introduces the
complexity of using asyncio, which seems like overkill for this problem
- Use selectors. Selectors (Linux only) provide a mechanism to listen for
events on multiple file descriptors with a timeout.
Best Solution (cross platform): Threads
adapted from: https://stackoverflow.com/questions/375427/non-blocking-read-on-a-subprocess-pipe-in-python
This solution is based on threads. The basic idea is to read from each process
in a separate thread and write the answers into a queue.
The example below depends on a file "foo.txt" that contains:
Line 1
Line 2
Line 3
Line 4
Output from the process should look something like the following:
('slowAwk1: a', 'line 1\n')
('slowAwk2: b', 'line 1\n')
('cat: c', 'line 1\n')
('cat: c', 'line 2\n')
('cat: c', 'line 3\n')
('cat: c', 'line 4')
('slowAwk2: b', 'line 2\n')
('slowAwk1: a', 'line 2\n')
('slowAwk2: b', 'line 3\n')
('slowAwk1: a', 'line 3\n')
('slowAwk2: b', 'line 4\n')
('slowAwk1: a', 'line 4\n')
[('slowAwk1', 0), ('slowAwk2', 0), ('cat', 0)]
This shows that output of multiple subprocesses can be read using multiple threads.
"""
import sys
from subprocess import PIPE, Popen
from threading import Thread
from time import sleep
from collections import deque
def handle_processes(processes, timeout=0.1):
def enqueue_output(handler, out, queue):
for line in out:
queue.append((handler, line))
queue.append((handler, None))
q = deque()
threads = [Thread(target=enqueue_output, args=(proc[1], proc[0].stdout, q)) for id, proc in processes.items()]
for thread in threads:
thread.daemon=True # thread dies with the program
thread.start()
# read line without blocking
while None in (p[0].poll() for p in processes.values()) or len(q) > 0:
try:
while len(q) > 0:
handler, line = q.popleft()
result = handler(line)
except IndexError:
sleep(timeout)
def main():
ON_POSIX = 'posix' in sys.builtin_module_names
def slow_awk(filename, delay):
args = ["awk", "{{print $0; system(\"sleep {}\");}}".format(delay), filename]
return Popen(args, stdout=PIPE, stderr=None, close_fds=ON_POSIX)
def cat(filename):
args = ["cat", filename]
return Popen(args, stdout=PIPE, stderr=None, close_fds=ON_POSIX)
def tagged_printer(tag):
def handler(line):
if line:
print((tag, line.decode()))
return None if line else (tag, 'done')
return handler
processes = {
'slowAwk1': (slow_awk("foo.txt", 0.02), tagged_printer("slowAwk1: a")),
'slowAwk2': (slow_awk("foo.txt", 0.01), tagged_printer("slowAwk2: b")),
'cat': (cat("foo.txt"), tagged_printer("cat: c"))
}
handle_processes(processes)
print([(id, proc[0].returncode) for id, proc in processes.items()])
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment