Skip to content

Instantly share code, notes, and snippets.

@carlos-a-g-h
Last active February 22, 2023 14:39
Show Gist options
  • Save carlos-a-g-h/cca0742ebc086b9e4d1319f22d75f289 to your computer and use it in GitHub Desktop.
Save carlos-a-g-h/cca0742ebc086b9e4d1319f22d75f289 to your computer and use it in GitHub Desktop.
import asyncio
import logging
logging.getLogger()
async def sp_streamer(hook,dbg=False):
proc=hook["proc"]
if dbg:
printout=0
while True:
if dbg:
print("→ sp_streamer() printout:",printout)
print(" proc.stdout:",proc.stdout)
if proc.stdout:
line_stdout=await proc.stdout.readline()
if line_stdout:
hook.update({"stream_stdout":line_stdout.rstrip().decode()})
if dbg:
print(" proc.stdout:",proc.stderr)
if proc.stderr:
line_stderr=await proc.stderr.readline()
if line_stderr:
hook.update({"stream_stderr":line_stderr.rstrip().decode()})
if dbg:
print(" hook:",hook)
printout=printout+1
if hook.get("cancel") or hook.get("cancelled"):
proc.terminate()
if dbg:
print("Terminating:",proc)
await asyncio.sleep(0.5)
if not proc.returncode==None:
break
async def sp_run(cmd_line,ret_stdout=True,ret_stderr=True,hook=None,stream=False,shell=False,rec=True,dbg=False):
stream_output=((not hook==None) and type(hook)==dict and stream)
just_hook=((not stream_output) and (not hook==None) and type(hook)==dict)
if dbg:
print("→ sp_run()\n$",cmd_line)
print(" stream_output",stream_output)
print(" just_hook",just_hook)
arg_stdout=None
if ret_stdout:
arg_stdout=asyncio.subprocess.PIPE
arg_stderr=None
if ret_stderr:
arg_stderr=asyncio.subprocess.PIPE
results=[]
try:
if shell:
# Subprocess shell: 'cmd_line' must be a string
proc=await asyncio.create_subprocess_shell(cmd_line,stdout=arg_stdout,stderr=arg_stderr)
if not shell:
# Subprocess exec: 'cmd_line' must be a list
proc=await asyncio.create_subprocess_exec(*cmd_line,stdout=arg_stdout,stderr=arg_stderr)
if just_hook or stream_output:
hook.update({"proc":proc})
if stream_output:
loop=asyncio.get_event_loop()
task_streamer=loop.create_task(sp_streamer(hook,dbg=dbg))
res=await asyncio.gather(proc.wait(),task_streamer)
if ret_stdout:
stdout_ok=hook.get("stream_stdout")
if ret_stderr:
stderr_ok=hook.get("stream_stderr")
if not stream_output:
stdout_raw,stderr_raw=await proc.communicate()
if ret_stdout and stdout_raw:
stdout_ok=stdout_raw.decode()
if ret_stderr and stderr_raw:
stderr_ok=stderr_raw.decode()
if rec:
# Returns exit code
results.append(proc.returncode)
except Exception as e:
logging.exception("Fatal error in sp_run()")
return results
if ret_stdout:
results.append(stdout_ok)
if ret_stderr:
results.append(stderr_ok)
# [ReturnCode, Standard Output, Standard Error]
return results
# WARNING: the sp_end* functions may cause deadlocks depending on the size of the application. Passing "cancel" or "cancelled" keys to the hook dict appears o be a safer option
async def sp_end(hook,ds="term"):
proc=hook["proc"]
if ds=="kill":
proc.terminate()
if ds=="term":
proc.kill()
async def sp_end_based(hook):
target_pid=hook["proc"].pid
cline=["pkill","-TERM","-P",str(target_pid)]
res=await sp_run(cline,ret_stdout=False,ret_stderr=False)
if res[0]==0:
return True
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment