Skip to content

Instantly share code, notes, and snippets.

@amirouche
Created May 2, 2015 11:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amirouche/8ce7bebb96be900a0bce to your computer and use it in GitHub Desktop.
Save amirouche/8ce7bebb96be900a0bce to your computer and use it in GitHub Desktop.
import asyncio
import shlex
# inspired by https://docs.python.org/3/library/asyncio-subprocess.html
class CheckOutput(asyncio.SubprocessProtocol):
def __init__(self, exit_future):
self.exit_future = exit_future
self.output = bytearray()
def pipe_data_received(self, fd, data):
self.output.extend(data)
def process_exited(self):
self.exit_future.set_result(True)
@asyncio.coroutine
def check_output(command):
command = shlex.split(command)
loop = asyncio.get_event_loop()
exit_future = asyncio.Future()
# Create the subprocess controlled by the protocol DateProtocol,
# redirect the standard output into a pipe
create = loop.subprocess_exec(
lambda: CheckOutput(exit_future),
*command,
stdin=None,
stderr=None
)
transport, protocol = yield from create
# Wait for the subprocess exit using the process_exited() method
# of the protocol
yield from exit_future
# Close the stdout pipe
transport.close()
# Read the output which was collected by the pipe_data_received()
# method of the protocol
data = bytes(protocol.output)
return data.decode('utf-8').rstrip()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
command = """date --date='TZ="America/Martinique" 09:00 next Fri'"""
date = loop.run_until_complete(check_output(command))
print("Command output: %s" % date)
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment