Skip to content

Instantly share code, notes, and snippets.

@gsauthof
Created October 31, 2017 13:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gsauthof/086a13af08045b2ebae7624a304ebc69 to your computer and use it in GitHub Desktop.
Save gsauthof/086a13af08045b2ebae7624a304ebc69 to your computer and use it in GitHub Desktop.
asyncssh compatible file-like object for capturing the most recent lines
class Last_Lines:
def __init__(self, n, binary=True):
self.n = n
self.binary = binary
self.lines = []
# asyncssh calls fstat() on fileno() and checks if it is a regular file
# thus we open any regular file to fake it ...
self.dev_null = open('/etc/resolv.conf')
def write(self, s):
lines = s.splitlines(True)
self.writelines(lines)
return len(s)
def writelines(self, lines):
if lines and self.lines and not self.lines[-1].endswith(b'\n' if self.binary else '\n'):
self.lines[-1] += lines[0]
lines.pop(0)
for line in lines:
self.lines.append(line)
if len(self.lines) > self.n:
for i in range(0, len(self.lines) - self.n):
self.lines.pop(0)
def fileno(self):
return self.dev_null.fileno()
def close(self):
pass
def getvalue(self):
if self.binary:
return b''.join(self.lines)
else:
return ''.join(self.lines)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment