Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Experimental concurrent line writer
class ConcurrentLineWriter:
class LineFileIO(StringIO):
def __init__(self, parent, row):
super().__init__()
self.parent = parent
self.row = row
def write(self, string):
super().write(string)
self.parent._write(self.row)
def flush(self):
super().flush()
self.parent.file.flush()
def __init__(self, file=sys.stdout):
self.file = file
self.lines = []
self.__pos = 0
def __enter__(self):
return self
def __exit__(self, *args):
self.close()
def __move_cursor(self, row):
offset = abs(self.__pos - row)
if row < self.__pos:
self.file.write(f"\033[{offset}A\r")
elif row > self.__pos:
self.file.write(f"\033[{offset}B\r")
self.__pos = row
def _write(self, row):
self.__move_cursor(row)
self.file.write(self.lines[row].getvalue())
def add_line(self):
self.lines.append(ConcurrentLineWriter.LineFileIO(self, len(self.lines)))
return self.lines[-1]
def close(self):
for line in self.lines:
line.close()
self.__move_cursor(len(self.lines))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.