Last active
December 22, 2015 15:49
-
-
Save rbistolfi/6495400 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding: utf-8 | |
"Non blocking subprocess wrapper" | |
from Queue import Queue, Empty | |
from subprocess import Popen, PIPE | |
from threading import Thread | |
import shlex | |
class ShellProcess(object): | |
"""A Popen wrapper that reads stdout in a thread | |
""" | |
def __init__(self, cmd, on_command_finished=None, on_line_received=None, | |
**options): | |
self.command = shlex.split(cmd) | |
default_options = { | |
"stdout": PIPE, | |
"stdin": PIPE, | |
"bufsize": 4096, | |
} | |
default_options.update(options) | |
self.options = default_options | |
self.on_command_finished = on_command_finished | |
self.on_line_received = on_line_received | |
self.process = Popen(self.command, **self.options) | |
self.queue = Queue() | |
self._running = True | |
self._t = Thread(target=self._enqueue) | |
self._t.daemon = True | |
self._t.start() | |
def __getattr__(self, name): | |
"Delegate to the Popen object" | |
return getattr(self.process, name) | |
def _enqueue(self): | |
"Read process stdout" | |
while self._running: | |
line = self.process.stdout.readline() | |
self._on_line_received(line) | |
return_code = self.process.poll() | |
if return_code is not None: | |
self._on_command_finished() | |
self._running = False | |
self.queue.put(None) | |
def _on_line_received(self, line): | |
"Put output in queue and call back if needed" | |
self.queue.put(line) | |
if self.on_line_received is not None: | |
self.on_line_received(line) | |
def _on_command_finished(self): | |
"Read what remains in the buffer and call back if needed" | |
# put anything buffered into the queue | |
for line in self.process.stdout.readlines(): | |
self._on_line_received(line) | |
if self.on_command_finished is not None: | |
self.on_command_finished(self.readlines()) | |
def terminate(self, *args, **kwargs): | |
"Exit the reading loop and terminate the process" | |
self._running = False | |
return self.process.terminate(*args, **kwargs) | |
def kill(self, *args, **kwargs): | |
"Exit the reading loop and kill the process" | |
self._running = False | |
return self.process.kill(*args, **kwargs) | |
def write(self, s): | |
"Write to stdin" | |
self.process.stdin.write(s + "\n") | |
def readline(self): | |
"Get a single line from queue without blocking" | |
try: | |
return self.queue.get_nowait() | |
except Empty: | |
return None | |
def readlines(self): | |
"Get all available lines from the queue" | |
out = [] | |
while True: | |
line = self.readline() | |
if line is None: | |
break | |
else: | |
out.append(line) | |
return "".join(out) | |
class Shell(object): | |
"Cretate ShellProcess instances" | |
def run(self, command, **popen_options): | |
"Run command through a ShellProcess()" | |
return ShellProcess(command, **popen_options) | |
def __getattr__(self, name): | |
"""Treat shell commands as functions: | |
>>> shell = Shell() | |
>>> shell.cat("sp.py") # create the cat function and call with "sp.py" | |
""" | |
f = lambda *a, **kw: self.run(name + " " + " ".join(a), **kw) | |
return f | |
# single instance | |
shell = Shell() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment