Skip to content

Instantly share code, notes, and snippets.

@Suor
Created July 3, 2015 02:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Suor/5d259adbf3ef06264a8e to your computer and use it in GitHub Desktop.
Save Suor/5d259adbf3ef06264a8e to your computer and use it in GitHub Desktop.
Memoizing IO Stream
class MemoizingStream(object):
"""
Write always to end, read from separate position.
"""
def __init__(self, fd):
print 'create memo'
self.memory = StringIO()
self.source_fd = fd
self.finished = False
def read(self, n=-1):
buf = self.memory.read(n)
if not self.finished and (not buf or (n and n > len(buf))):
# Read from source
rest_n = n - len(buf) if n and n > 0 else -1
new_buf = self.source_fd.read(rest_n)
if not new_buf:
self.finished = True
else:
buf += new_buf
self.write(new_buf)
self.memory.seek(len(new_buf), 1)
return buf
def write(self, s):
read_pos = self.memory.tell()
self.memory.seek(0, 2)
self.memory.write(s)
self.memory.seek(read_pos)
def seek(self, pos, mode=0):
if mode == 2:
raise TypeError("Can't seek to the end of MemoizingStream")
self.memory.seek(pos, mode)
def tell(self):
self.memory.tell()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment