Last active
December 22, 2019 15:44
-
-
Save ZenulAbidin/04bc3cfd865bbaa8060e9cb19332d398 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from io import BufferedReader, StringIO, UnsupportedOperation | |
# Because this is a read-only stream, write methods raise an exception. | |
class BufferedStringReader: | |
def __init__(self, string, name, mode): | |
if mode != 'rb' and mode != 'br': | |
raise ValueError("invalid mode: " + mode) | |
self.reader = open(name, mode) | |
self.stringio = StringIO(string) | |
self.string = string | |
self.stringlen = len(string) | |
self.readerpos = 0 | |
# self.reader.tell() doesn't seem to work properly | |
self.pos = self.stringio.tell() + self.readerpos | |
self.closed = (self.reader.closed and self.stringio.closed) | |
# This is my own method, not a method in the io module | |
def __update_state__(self): | |
self.pos = self.stringio.tell() + self.readerpos | |
self.closed = (self.reader.closed and self.stringio.closed) | |
def __repr__(self): | |
return "{BufferedStringReader reader=" + str(self.reader) + " stringio=" + str(self.stringio) + "}" | |
def getvalue(self): | |
# Reads the entire file. Since this is an infinite byte stream this is not supported. | |
raise UnsupportedOperation("BufferedStringReader does not support getvalue()") | |
def close(self): | |
self.reader.close() | |
self.stringio.close() | |
self.closed = True | |
def flush(self): | |
raise UnsupportedOperation("BufferedStringReader does not support flush()") | |
# This function leaves the file stream in an undefined state | |
def detach(self): | |
self.reader.detach() | |
#self.stringio.detach() Not supported | |
def isatty(self): | |
return False | |
def peek(self, size=0): | |
# We cannot guarrentee one single read will be done in this class but | |
# we can guarrentee that only one read will be done to the BufferedReader. | |
# Note that the number of bytes returned may be less or more than requested. | |
# Also in BufferedReader the size can apparently be negative and it's ignored | |
# in any case (on my linux system it returns 4K bytes even for positive size). | |
# Long story short; Python io's peek function ignores size. | |
# Nevertheless, we *try* to return exactly size bytes. | |
peek_size = (size if size > 0 else 1) | |
string_pos = self.stringio.tell() | |
string_read = self.string[string_pos:string_pos+peek_size] | |
if len(string_read) < peek_size: | |
reader_read = self.reader.peek(peek_size) | |
else: | |
reader_read = b'' | |
return (bytes(string_read, 'utf-8') + reader_read)[0:peek_size] | |
def read(self, size=-1): | |
# size=-1 will cause your machine to run out of memory since you're dealing with | |
# and infinitely sized byte stream. | |
if size <= -1: | |
raise UnsupportedOperation("BufferedStringReader does not support read(size=-1)") | |
string_read = '' | |
reader_read = b'' | |
if self.pos <= self.stringlen and self.pos+size <= self.stringlen: | |
string_read = self.stringio.read(size); | |
elif self.pos <= self.stringlen and self.pos+size > self.stringlen: | |
# No exceptions below, will return a smaller string if necessary | |
string_read = self.stringio.read(size); | |
reader_read = self.reader.read(size-len(string_read)); | |
elif self.pos > self.stringlen: | |
reader_read = self.reader.read(size); | |
else: | |
# Impossible condition, detatch for security | |
raise RuntimeError | |
self.readerpos += len(reader_read) | |
self.__update_state__() | |
return bytes(string_read, 'utf-8') + reader_read; | |
def readable(self): | |
return True | |
def read1(self, size=-1): | |
# This is impossible to implement due to the arhitecture of this class. | |
# This method is supposed to call read() exactly once but since there are | |
# two IO streams, it is not possible. | |
raise UnsupportedOperation("BufferedStringReader does not support read1()") | |
def readinto(self, bytarray): | |
temp_read = self.read(len(bytarray)) | |
temp_array = bytearray(temp_read) | |
if len(temp_array) <= 0: | |
return 0 | |
bytarray[0:len(temp_array)] = temp_array | |
return len(temp_array) | |
def readinto1(self, bytarray): | |
raise UnsupportedOperation("BufferedStringReader does not support readinto1()") | |
def readlines(self, size=-1): | |
# Unlike read(), we stop when we encounter \n. No exceptions. | |
# Note that it's still possible we run out of memory if a large enough | |
# size is used and we don't find a newline. | |
# A trailing newline is kept in the string. | |
read_size = 0; | |
if size >= 0: | |
# If size is given try to find up to size | |
temp_read = self.peek(size) | |
newline = temp_read.find(b'\n') | |
if newline == -1: | |
read_size = len(temp_read) | |
else: | |
read_size = newline+1 | |
else: | |
# Keep increasing size by 4K until newline is found | |
newline = -1 | |
while newline == -1: | |
read_size += 4096 | |
temp_read = self.peek(read_size); | |
newline = temp_read.find(b'\n') | |
read_size = newline+1 | |
return self.read(read_size); | |
def seek(self, pos, whence=0): | |
if whence==2: # Seek from end of infinite file doesn't make sense | |
UnsupportedOperation("BufferedStringReader does not support seek(whence=2)") | |
elif whence==1: | |
self.pos = max(self.pos+pos, 0) | |
elif whence==0: | |
if pos < 0: | |
raise OSError("[Errno 22] Invalid argument") | |
self.pos = pos | |
else: | |
raise ValueError("whence value " + str(whence) + " unsupported") | |
self.stringio.seek(min(self.pos, self.stringlen), 0) | |
self.reader.seek(max(self.pos - self.stringlen, 0), 0) | |
self.readerpos = max(self.pos - self.stringlen, 0) | |
self.__update_state__() | |
return self.pos | |
def seekable(self): | |
return True | |
def tell(self): | |
return self.pos | |
def truncate(self, pos=None): | |
raise UnsupportedOperation("BufferedStringReader does not support truncate()") | |
def write(self, s): | |
raise UnsupportedOperation("BufferedStringReader does not support write()") | |
def writelines(self, s): | |
raise UnsupportedOperation("BufferedStringReader does not support writelines()") | |
def writable(self): | |
return False | |
# To use: | |
reader = BufferedStringReader("something", "/dev/urandom", "rb") | |
print(reader) | |
assert reader.readable() == True | |
assert reader.seekable() == True | |
assert reader.writable() == False | |
assert reader.isatty() == False | |
assert reader.tell() == 0 | |
assert reader.read(4) == b"some" | |
assert reader.tell() == 4 | |
reader.seek(pos=-2, whence=1) | |
assert reader.read(2) == b"me" | |
reader.seek(pos=0, whence=0) | |
assert reader.tell() == 0 | |
print(reader.read(12)) # "something<First 3 characters from /dev/urandom>" | |
assert reader.tell() == 12 | |
print(reader.read(12)) # "<Next 12 characters from /dev/urandom>" | |
assert reader.tell() == 24 | |
zero_array = bytearray(0) | |
assert reader.readinto(zero_array) == 0 | |
assert zero_array == b'' | |
zero_array = bytearray(10) | |
assert reader.readinto(zero_array) == 10 | |
print(zero_array) | |
reader.seek(pos=0, whence=0) | |
assert reader.peek(4) == b"some" | |
assert reader.read(4) == b"some" | |
assert reader.tell() == 4 | |
print(reader.peek(8)) # "thing<First 3 characters from /dev/urandom>" | |
print(reader.read(8)) # Note: This is a block device, so output may not be the same as above. | |
assert reader.tell() == 12 | |
print(reader.peek(8)) # "FNext20 characters from /dev/urandom>" | |
assert reader.tell() == 12 | |
reader.close() | |
reader = BufferedStringReader("something\nspam\neggs\n", "/dev/urandom", "rb") | |
assert reader.readlines() == b"something\n" | |
assert reader.readlines(3) == b"spa" | |
assert reader.readlines() == b"m\n" | |
assert reader.readlines(5) == b"eggs\n" | |
reader.detach() | |
reader = BufferedStringReader("something\nspam\neggs\n", "/dev/urandom", "rb") | |
try: | |
reader.flush() | |
except UnsupportedOperation as e: | |
print(e) | |
try: | |
reader.getvalue() | |
except UnsupportedOperation as e: | |
print(e) | |
try: | |
reader.read(size=-1) | |
except UnsupportedOperation as e: | |
print(e) | |
try: | |
reader.read1(10) | |
except UnsupportedOperation as e: | |
print(e) | |
try: | |
reader.readinto1(10) | |
except UnsupportedOperation as e: | |
print(e) | |
try: | |
reader.seek(pos=100, whence=2) | |
except UnsupportedOperation as e: | |
print(e) | |
try: | |
reader.truncate(10) | |
except UnsupportedOperation as e: | |
print(e) | |
try: | |
reader.write('eggs') | |
except UnsupportedOperation as e: | |
print(e) | |
try: | |
reader.writelines('eggs') | |
except UnsupportedOperation as e: | |
print(e) | |
try: | |
reader = BufferedStringReader("something\nspam\neggs\n", "/dev/urandom", "wb") | |
except ValueError as e: | |
print(e) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment