Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Generate a file dynamically and upload it to S3
import os
from io import BytesIO
import boto3
BUCKET = 'bucket-name'
def gen_int():
a = 0
b = 1
while True:
c = (a + b) % 10
yield c
a = b
b = c
class FileObject:
def __init__(self, size):
self.offset = 0
self.file_size = size
self.gen = gen_int()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
pass
def read(self, size=-1):
if self.offset >= self.file_size:
return b''
buffer = BytesIO()
for _ in range(size):
c = str(next(self.gen)).encode('ascii')
buffer.write(c)
self.offset += 1
if self.offset >= self.file_size:
break
return buffer.getvalue()
def main():
key_id = os.environ['AWS_ACCESS_KEY_ID']
secret = os.environ['AWS_SECRET_ACCESS_KEY']
token = os.environ['AWS_SESSION_TOKEN']
s3 = boto3.client(
's3',
aws_access_key_id=key_id,
aws_secret_access_key=secret,
aws_session_token=token,
)
file_obj = FileObject(1000000)
with file_obj as f:
s3.upload_fileobj(f, BUCKET, 'myfile.txt')
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment