Skip to content

Instantly share code, notes, and snippets.

@rbistolfi
Last active December 22, 2015 15:49
Show Gist options
  • Save rbistolfi/6495400 to your computer and use it in GitHub Desktop.
Save rbistolfi/6495400 to your computer and use it in GitHub Desktop.
# coding: utf-8
"Non blocking subprocess wrapper"
from Queue import Queue, Empty
from subprocess import Popen, PIPE
from threading import Thread
import shlex
class ShellProcess(object):
"""A Popen wrapper that reads stdout in a thread
"""
def __init__(self, cmd, on_command_finished=None, on_line_received=None,
**options):
self.command = shlex.split(cmd)
default_options = {
"stdout": PIPE,
"stdin": PIPE,
"bufsize": 4096,
}
default_options.update(options)
self.options = default_options
self.on_command_finished = on_command_finished
self.on_line_received = on_line_received
self.process = Popen(self.command, **self.options)
self.queue = Queue()
self._running = True
self._t = Thread(target=self._enqueue)
self._t.daemon = True
self._t.start()
def __getattr__(self, name):
"Delegate to the Popen object"
return getattr(self.process, name)
def _enqueue(self):
"Read process stdout"
while self._running:
line = self.process.stdout.readline()
self._on_line_received(line)
return_code = self.process.poll()
if return_code is not None:
self._on_command_finished()
self._running = False
self.queue.put(None)
def _on_line_received(self, line):
"Put output in queue and call back if needed"
self.queue.put(line)
if self.on_line_received is not None:
self.on_line_received(line)
def _on_command_finished(self):
"Read what remains in the buffer and call back if needed"
# put anything buffered into the queue
for line in self.process.stdout.readlines():
self._on_line_received(line)
if self.on_command_finished is not None:
self.on_command_finished(self.readlines())
def terminate(self, *args, **kwargs):
"Exit the reading loop and terminate the process"
self._running = False
return self.process.terminate(*args, **kwargs)
def kill(self, *args, **kwargs):
"Exit the reading loop and kill the process"
self._running = False
return self.process.kill(*args, **kwargs)
def write(self, s):
"Write to stdin"
self.process.stdin.write(s + "\n")
def readline(self):
"Get a single line from queue without blocking"
try:
return self.queue.get_nowait()
except Empty:
return None
def readlines(self):
"Get all available lines from the queue"
out = []
while True:
line = self.readline()
if line is None:
break
else:
out.append(line)
return "".join(out)
class Shell(object):
"Cretate ShellProcess instances"
def run(self, command, **popen_options):
"Run command through a ShellProcess()"
return ShellProcess(command, **popen_options)
def __getattr__(self, name):
"""Treat shell commands as functions:
>>> shell = Shell()
>>> shell.cat("sp.py") # create the cat function and call with "sp.py"
"""
f = lambda *a, **kw: self.run(name + " " + " ".join(a), **kw)
return f
# single instance
shell = Shell()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment