Skip to content

Instantly share code, notes, and snippets.

@ZenulAbidin
Last active December 22, 2019 15:44
Show Gist options
  • Save ZenulAbidin/04bc3cfd865bbaa8060e9cb19332d398 to your computer and use it in GitHub Desktop.
Save ZenulAbidin/04bc3cfd865bbaa8060e9cb19332d398 to your computer and use it in GitHub Desktop.
from io import BufferedReader, StringIO, UnsupportedOperation
# Because this is a read-only stream, write methods raise an exception.
class BufferedStringReader:
def __init__(self, string, name, mode):
if mode != 'rb' and mode != 'br':
raise ValueError("invalid mode: " + mode)
self.reader = open(name, mode)
self.stringio = StringIO(string)
self.string = string
self.stringlen = len(string)
self.readerpos = 0
# self.reader.tell() doesn't seem to work properly
self.pos = self.stringio.tell() + self.readerpos
self.closed = (self.reader.closed and self.stringio.closed)
# This is my own method, not a method in the io module
def __update_state__(self):
self.pos = self.stringio.tell() + self.readerpos
self.closed = (self.reader.closed and self.stringio.closed)
def __repr__(self):
return "{BufferedStringReader reader=" + str(self.reader) + " stringio=" + str(self.stringio) + "}"
def getvalue(self):
# Reads the entire file. Since this is an infinite byte stream this is not supported.
raise UnsupportedOperation("BufferedStringReader does not support getvalue()")
def close(self):
self.reader.close()
self.stringio.close()
self.closed = True
def flush(self):
raise UnsupportedOperation("BufferedStringReader does not support flush()")
# This function leaves the file stream in an undefined state
def detach(self):
self.reader.detach()
#self.stringio.detach() Not supported
def isatty(self):
return False
def peek(self, size=0):
# We cannot guarrentee one single read will be done in this class but
# we can guarrentee that only one read will be done to the BufferedReader.
# Note that the number of bytes returned may be less or more than requested.
# Also in BufferedReader the size can apparently be negative and it's ignored
# in any case (on my linux system it returns 4K bytes even for positive size).
# Long story short; Python io's peek function ignores size.
# Nevertheless, we *try* to return exactly size bytes.
peek_size = (size if size > 0 else 1)
string_pos = self.stringio.tell()
string_read = self.string[string_pos:string_pos+peek_size]
if len(string_read) < peek_size:
reader_read = self.reader.peek(peek_size)
else:
reader_read = b''
return (bytes(string_read, 'utf-8') + reader_read)[0:peek_size]
def read(self, size=-1):
# size=-1 will cause your machine to run out of memory since you're dealing with
# and infinitely sized byte stream.
if size <= -1:
raise UnsupportedOperation("BufferedStringReader does not support read(size=-1)")
string_read = ''
reader_read = b''
if self.pos <= self.stringlen and self.pos+size <= self.stringlen:
string_read = self.stringio.read(size);
elif self.pos <= self.stringlen and self.pos+size > self.stringlen:
# No exceptions below, will return a smaller string if necessary
string_read = self.stringio.read(size);
reader_read = self.reader.read(size-len(string_read));
elif self.pos > self.stringlen:
reader_read = self.reader.read(size);
else:
# Impossible condition, detatch for security
raise RuntimeError
self.readerpos += len(reader_read)
self.__update_state__()
return bytes(string_read, 'utf-8') + reader_read;
def readable(self):
return True
def read1(self, size=-1):
# This is impossible to implement due to the arhitecture of this class.
# This method is supposed to call read() exactly once but since there are
# two IO streams, it is not possible.
raise UnsupportedOperation("BufferedStringReader does not support read1()")
def readinto(self, bytarray):
temp_read = self.read(len(bytarray))
temp_array = bytearray(temp_read)
if len(temp_array) <= 0:
return 0
bytarray[0:len(temp_array)] = temp_array
return len(temp_array)
def readinto1(self, bytarray):
raise UnsupportedOperation("BufferedStringReader does not support readinto1()")
def readlines(self, size=-1):
# Unlike read(), we stop when we encounter \n. No exceptions.
# Note that it's still possible we run out of memory if a large enough
# size is used and we don't find a newline.
# A trailing newline is kept in the string.
read_size = 0;
if size >= 0:
# If size is given try to find up to size
temp_read = self.peek(size)
newline = temp_read.find(b'\n')
if newline == -1:
read_size = len(temp_read)
else:
read_size = newline+1
else:
# Keep increasing size by 4K until newline is found
newline = -1
while newline == -1:
read_size += 4096
temp_read = self.peek(read_size);
newline = temp_read.find(b'\n')
read_size = newline+1
return self.read(read_size);
def seek(self, pos, whence=0):
if whence==2: # Seek from end of infinite file doesn't make sense
UnsupportedOperation("BufferedStringReader does not support seek(whence=2)")
elif whence==1:
self.pos = max(self.pos+pos, 0)
elif whence==0:
if pos < 0:
raise OSError("[Errno 22] Invalid argument")
self.pos = pos
else:
raise ValueError("whence value " + str(whence) + " unsupported")
self.stringio.seek(min(self.pos, self.stringlen), 0)
self.reader.seek(max(self.pos - self.stringlen, 0), 0)
self.readerpos = max(self.pos - self.stringlen, 0)
self.__update_state__()
return self.pos
def seekable(self):
return True
def tell(self):
return self.pos
def truncate(self, pos=None):
raise UnsupportedOperation("BufferedStringReader does not support truncate()")
def write(self, s):
raise UnsupportedOperation("BufferedStringReader does not support write()")
def writelines(self, s):
raise UnsupportedOperation("BufferedStringReader does not support writelines()")
def writable(self):
return False
# To use:
reader = BufferedStringReader("something", "/dev/urandom", "rb")
print(reader)
assert reader.readable() == True
assert reader.seekable() == True
assert reader.writable() == False
assert reader.isatty() == False
assert reader.tell() == 0
assert reader.read(4) == b"some"
assert reader.tell() == 4
reader.seek(pos=-2, whence=1)
assert reader.read(2) == b"me"
reader.seek(pos=0, whence=0)
assert reader.tell() == 0
print(reader.read(12)) # "something<First 3 characters from /dev/urandom>"
assert reader.tell() == 12
print(reader.read(12)) # "<Next 12 characters from /dev/urandom>"
assert reader.tell() == 24
zero_array = bytearray(0)
assert reader.readinto(zero_array) == 0
assert zero_array == b''
zero_array = bytearray(10)
assert reader.readinto(zero_array) == 10
print(zero_array)
reader.seek(pos=0, whence=0)
assert reader.peek(4) == b"some"
assert reader.read(4) == b"some"
assert reader.tell() == 4
print(reader.peek(8)) # "thing<First 3 characters from /dev/urandom>"
print(reader.read(8)) # Note: This is a block device, so output may not be the same as above.
assert reader.tell() == 12
print(reader.peek(8)) # "FNext20 characters from /dev/urandom>"
assert reader.tell() == 12
reader.close()
reader = BufferedStringReader("something\nspam\neggs\n", "/dev/urandom", "rb")
assert reader.readlines() == b"something\n"
assert reader.readlines(3) == b"spa"
assert reader.readlines() == b"m\n"
assert reader.readlines(5) == b"eggs\n"
reader.detach()
reader = BufferedStringReader("something\nspam\neggs\n", "/dev/urandom", "rb")
try:
reader.flush()
except UnsupportedOperation as e:
print(e)
try:
reader.getvalue()
except UnsupportedOperation as e:
print(e)
try:
reader.read(size=-1)
except UnsupportedOperation as e:
print(e)
try:
reader.read1(10)
except UnsupportedOperation as e:
print(e)
try:
reader.readinto1(10)
except UnsupportedOperation as e:
print(e)
try:
reader.seek(pos=100, whence=2)
except UnsupportedOperation as e:
print(e)
try:
reader.truncate(10)
except UnsupportedOperation as e:
print(e)
try:
reader.write('eggs')
except UnsupportedOperation as e:
print(e)
try:
reader.writelines('eggs')
except UnsupportedOperation as e:
print(e)
try:
reader = BufferedStringReader("something\nspam\neggs\n", "/dev/urandom", "wb")
except ValueError as e:
print(e)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment