Skip to content

Instantly share code, notes, and snippets.

@FZambia
Last active September 27, 2022 08:16
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save FZambia/5363149 to your computer and use it in GitHub Desktop.
Save FZambia/5363149 to your computer and use it in GitHub Desktop.
tornado subprocess call
from __future__ import print_function
from tornado.gen import Task, Return, coroutine
import tornado.process
import subprocess
from tornado.ioloop import IOLoop
STREAM = tornado.process.Subprocess.STREAM
@coroutine
def call_subprocess(cmd, stdin_data=None):
"""
Wrapper around subprocess call using Tornado's Subprocess class.
"""
try:
sprocess = tornado.process.Subprocess(
cmd,
stdin=subprocess.PIPE,
stdout=STREAM,
stderr=STREAM
)
except OSError as e:
raise Return((None, e))
if stdin_data:
sprocess.stdin.write(stdin_data)
sprocess.stdin.flush()
sprocess.stdin.close()
result, error = yield [
Task(sprocess.stdout.read_until_close),
Task(sprocess.stderr.read_until_close)
]
raise Return((result, error))
@coroutine
def ls():
result, error = yield Task(call_subprocess, 'ls')
raise Return((result, error))
@coroutine
def main():
result, error = yield ls()
print(result, error)
IOLoop.instance().stop()
if __name__ == "__main__":
ioloop = IOLoop.instance()
ioloop.add_callback(main)
ioloop.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment