Skip to content

Instantly share code, notes, and snippets.

@b-simjoo
Created February 17, 2023 13:35
Show Gist options
  • Save b-simjoo/989d5911019f312d2639ae84499b9f2d to your computer and use it in GitHub Desktop.
Save b-simjoo/989d5911019f312d2639ae84499b9f2d to your computer and use it in GitHub Desktop.
Running a process and use input and output (with reader thread) in Python
from subprocess import PIPE, Popen, run
from threading import Thread
from time import sleep
try:
from queue import Queue, Empty
except ImportError:
from Queue import Queue, Empty # python 2.x
out_q = Queue()
err_q = Queue()
def enqueue_output(out, q):
for line in iter(out.readline, ""):
q.put_nowait(line)
out.close()
def readline(q = out_q, timeout:int|None=30) -> str:
'''Read line from stream queue.
:param timeout: timeout is None (the default), block if necessary until an item is available.\
If timeout is a positive number, it blocks at most timeout seconds and raises the Empty exception\
if no item was available within that time. read more: https://docs.python.org/3/library/queue.html#queue.Queue.get
'''
line = q.get(True, timeout)
def exec_proc(*args)->tuple[Popen, Thread, Thread]:
'''Execute a process, start reader threads and return Popen instance'''
proc = Popen(args,
stdout=PIPE,
stderr=PIPE,
stdin=PIPE,
encoding="utf-8")
Thread(target=enqueue_output, args=(proc.stdout, out_q), daemon=True).start()
Thread(target=enqueue_output, args=(proc.stderr,err_q), daemon=True).start()
return proc
proc = exec_proc("ssh", "-i", "mykey", "bsimjoo@somewhe.re")
def writeline(*lines: str, end="\n", sep=" "):
if proc.poll() is not None:
return False
line = sep.join(lines) + end
proc.stdin.write(line)
return True
if proc.poll() is None:
# process is alive! keep going
try:
while True:
line = readline(timeout=1)
print(line)
except:
pass
r = writeline("echo hello")
if r:
print(readline())
proc.kill()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment