Skip to content

Instantly share code, notes, and snippets.

@gravesm
Last active April 5, 2019 15:26
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gravesm/da11204e051a39f6b2e8ebbf62ca5b71 to your computer and use it in GitHub Desktop.
Save gravesm/da11204e051a39f6b2e8ebbf62ca5b71 to your computer and use it in GitHub Desktop.
import io
class S3IO(io.BufferedIOBase):
def __init__(self, s3_obj):
self.obj = s3_obj
self._position = io.SEEK_SET
def tell(self):
return self._position
def seek(self, offset, whence=io.SEEK_SET):
if whence == io.SEEK_SET:
self._position = offset
elif whence == io.SEEK_CUR:
self._position += offset
else:
self._position = self.obj.content_length-1 + offset
def readable(self):
return True
def seekable(self):
return True
def read(self, size=-1):
if size == 0:
return b''
if size is None or size < 0:
rng = "bytes={}-".format(self._position)
else:
rng = "bytes={}-{}".format(self._position, self._position + size-1)
resp = self.obj.get(Range=rng)
self._position += resp['ContentLength']
return resp['Body'].read()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment