Skip to content

Instantly share code, notes, and snippets.

@debedb
Last active November 26, 2020 09:05
Show Gist options
  • Save debedb/2e5cbeb54e43f031eaf0 to your computer and use it in GitHub Desktop.
Save debedb/2e5cbeb54e43f031eaf0 to your computer and use it in GitHub Desktop.
Wrap boto3's StreamingBody object to provide enough Python fileobj functionality fileobj functionality so that GzipFile is satisfied.
class WrappedStreamingBody:
"""
Wrap boto3's StreamingBody object to provide enough
fileobj functionality so that GzipFile is
satisfied. Sometimes duck typing is awesome.
https://gist.github.com/debedb/2e5cbeb54e43f031eaf0
TODO that gist does not have the EOF fix!
"""
def __init__(self, sb, size):
# The StreamingBody we're wrapping
self.sb = sb
# Initial position
self.pos = 0
# Size of the object
self.size = size
def tell(self):
#print("In tell()")
return self.pos
def readline(self):
#print("Calling readline()")
try:
retval = self.sb.readline()
except struct.error:
raise EOFError()
self.pos += len(retval)
return retval
def read(self, n=None):
retval = self.sb.read(n)
if retval == "":
raise EOFError()
self.pos += len(retval)
return retval
def seek(self, offset, whence=0):
#print("Calling seek()")
retval = self.pos
if whence == 2:
if offset == 0:
retval = self.size
else:
raise Exception("Unsupported")
else:
if whence == 1:
offset = self.pos + offset
if offset > self.size:
retval = self.size
else:
retval = offset
# print("In seek(%s, %s): %s, size is %s" % (offset, whence, retval, self.size))
self.pos = retval
return retval
def __str__(self):
return "WrappedBody"
def __getattr__(self, attr):
# print("Calling %s" % attr)
if attr == 'tell':
return self.tell
elif attr == 'seek':
return self.seek
elif attr == 'read':
return self.read
elif attr == 'readline':
return self.readline
elif attr == '__str__':
return self.__str__
else:
return getattr(self.sb, attr)
@vrivellino
Copy link

FYI, this solution looks to be a bit more straightforward: https://gist.github.com/veselosky/9427faa38cee75cd8e27

@dstandish
Copy link

@vrivellino that solution downloads the whole file into memory:

retr = s3.get_object(Bucket=bucket, Key='gztest.txt')
bytestream = BytesIO(retr['Body'].read())
got_text = GzipFile(None, 'rb', fileobj=bytestream).read().decode('utf-8')

This one will let you stream:

sb = response['Body']
wsb = WrappedStreamingBody(sb, 1024)
gz_file = GzipFile(fileobj=wsb, mode='rb')
gz_file.read(100)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment