Skip to content

Instantly share code, notes, and snippets.

@jmuhlich
Created August 25, 2020 17:37
Show Gist options
  • Save jmuhlich/82f79d1864b90322720a7fed1cc97051 to your computer and use it in GitHub Desktop.
Save jmuhlich/82f79d1864b90322720a7fed1cc97051 to your computer and use it in GitHub Desktop.
PySB asyncio solution for communicating with BNG
#!/bin/bash
echo "start"
for ((i=1; i<=32000; i++)); do
echo "STDERR $i" >&2
done
echo "normal 1"
sleep 1
for ((i=1; i<=32000; i++)); do
echo "STDERR $i" >&2
done
echo "normal 2"
sleep 1
for ((i=1; i<=32000; i++)); do
echo "STDERR $i" >&2
done
echo "ERROR: foo"
echo " bar"
echo " baz"
echo "end"
for ((i=1; i<=32000; i++)); do
echo "STDERR $i" >&2
done
# Try uncommenting this and/or commenting the "echo ERROR" lines for testing.
# exit 1
import sys
import asyncio
import logging
from asyncio.subprocess import PIPE
logging.basicConfig(level=logging.DEBUG)
async def run_process():
process = await asyncio.create_subprocess_exec(
"bash", "blocker.sh", stdout=PIPE, stderr=PIPE,
)
capture_error = False
captured_error_lines = []
# Coroutine to read stderr to keep a full buffer from blocking us.
async def read_content():
return await process.stderr.read()
# ensure_future instead of create_task required for Python<3.7.
read_task = asyncio.ensure_future(read_content())
async for line in process.stdout:
line = line[:-1].decode('utf-8')
if line.startswith('ERROR:'):
capture_error = True
if capture_error:
captured_error_lines.append(line)
logging.debug(line)
returncode = await process.wait()
if returncode or captured_error_lines:
stderr = (await read_task).decode('utf-8').rstrip()
raise Exception('\n'.join(captured_error_lines) + "\n" + stderr)
if sys.platform == "win32":
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
loop.run_until_complete(run_process())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment