Skip to content

Instantly share code, notes, and snippets.

@vlovich
Created February 24, 2019 08:33
Show Gist options
  • Save vlovich/9c5aecda35a287276455e0a6323e9ba8 to your computer and use it in GitHub Desktop.
Save vlovich/9c5aecda35a287276455e0a6323e9ba8 to your computer and use it in GitHub Desktop.
import asyncio
import sys
import time
def EXITED():
# Did app terminate
pass
class PatchableStderrStream(object):
def __init__(self):
self._real_stderr = sys.stderr
self._buffered = []
self._buffered_cond = asyncio.Condition()
self._periodic_flush = None
self._flushing = False
async def _flushit(self):
flush_refresh_rate = 20 # Hz
millis_between_flush = 1000 / flush_refresh_rate
last_flush_ts = None
while not EXITED():
async with self._buffered_cond:
await self._buffered_cond.wait_for(lambda: bool(self._buffered))
if last_flush_ts:
# If we haven't slept at least millis_between_flush sleep the
# remaininder of the update slice.
millis_since_last_flush = time.monotonic() - last_flush_ts
remaining = max(0, millis_between_flush - millis_since_last_flush)
await asyncio.sleep(remaining)
time_since_last_flush = time.monotonic()
# Dump all buffered data to the screen
async with self._buffered_cond:
to_flush = "".join(self._buffered)
del self._buffered[:]
sys.stderr.write(to_flush)
# Dump any remaining buffered data to the screen
async with self._buffered_cond:
to_flush = "".join(self._buffered)
del self._buffered[:]
self._real_stderr.write(to_flush)
async def awrite(self, data):
async with self._buffered_cond:
if EXITED():
# This is any logs that may get generated after the app is torn down.
self._real_stderr.write(data)
return
# Batch writes to stderr at a fixed rate to avoid the cost of patch_stdout
# https://github.com/jonathanslenders/python-prompt-toolkit/issues/682
self._buffered.append(data)
self._buffered_cond.notify()
def write(self, data):
if asyncio.get_event_loop().is_running():
asyncio.create_task(self.awrite(data))
else:
sys.stderr.write(data)
def flush(self):
if asyncio.get_event_loop().is_running() and not self._flushing:
self._flushing = asyncio.create_task(self._flushit())
# No-op flush to workaround slow patch_stdout more. The complete fix is the
# periodic batching/flushing above.
# https://github.com/jonathanslenders/python-prompt-toolkit/issues/682
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment