Skip to content

Instantly share code, notes, and snippets.

@soloradish
Forked from debedb/WrappedStreamingBody
Created September 25, 2018 03:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save soloradish/368865366f48089b69a7232531bcab94 to your computer and use it in GitHub Desktop.
Save soloradish/368865366f48089b69a7232531bcab94 to your computer and use it in GitHub Desktop.
Wrap boto3's StreamingBody object to provide enough Python fileobj functionality fileobj functionality so that GzipFile is satisfied.
class WrappedStreamingBody:
"""
Wrap boto3's StreamingBody object to provide enough
fileobj functionality so that GzipFile is
satisfied. Sometimes duck typing is awesome.
https://gist.github.com/debedb/2e5cbeb54e43f031eaf0
TODO that gist does not have the EOF fix!
"""
def __init__(self, sb, size):
# The StreamingBody we're wrapping
self.sb = sb
# Initial position
self.pos = 0
# Size of the object
self.size = size
def tell(self):
#print("In tell()")
return self.pos
def readline(self):
#print("Calling readline()")
try:
retval = self.sb.readline()
except struct.error:
raise EOFError()
self.pos += len(retval)
return retval
def read(self, n=None):
retval = self.sb.read(n)
if retval == "":
raise EOFError()
self.pos += len(retval)
return retval
def seek(self, offset, whence=0):
#print("Calling seek()")
retval = self.pos
if whence == 2:
if offset == 0:
retval = self.size
else:
raise Exception("Unsupported")
else:
if whence == 1:
offset = self.pos + offset
if offset > self.size:
retval = self.size
else:
retval = offset
# print("In seek(%s, %s): %s, size is %s" % (offset, whence, retval, self.size))
self.pos = retval
return retval
def __str__(self):
return "WrappedBody"
def __getattr__(self, attr):
# print("Calling %s" % attr)
if attr == 'tell':
return self.tell
elif attr == 'seek':
return self.seek
elif attr == 'read':
return self.read
elif attr == 'readline':
return self.readline
elif attr == '__str__':
return self.__str__
else:
return getattr(self.sb, attr)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment