Skip to content

Instantly share code, notes, and snippets.

@vytas7
Created July 26, 2022 08:12
Show Gist options
  • Save vytas7/4ab1317455318db05b309dae75ab1662 to your computer and use it in GitHub Desktop.
Save vytas7/4ab1317455318db05b309dae75ab1662 to your computer and use it in GitHub Desktop.
Upload files from a multipart form directly to S3 (Falcon ASGI)
import logging
import aioboto3
import falcon.asgi
logging.basicConfig(
format='%(asctime)s [%(levelname)s] %(message)s', level=logging.INFO
)
class Upload:
def __init__(self):
self._session = aioboto3.Session()
async def on_post(self, req, resp):
form = await req.get_media()
async with self._session.client(
service_name='s3',
aws_access_key_id='test:tester',
aws_secret_access_key='testing',
endpoint_url='http://localhost:8080',
) as s3:
async for part in form:
if part.filename:
await s3.upload_fileobj(
Bucket='bucket',
Key=f'uploads/{part.secure_filename}',
Fileobj=part.stream,
)
app = falcon.asgi.App()
app.add_route('/uploads', Upload())
@vytas7
Copy link
Author

vytas7 commented Jul 26, 2022

Credits to @williamkibira for verifying this is possible (in a snippet posted on falconry/user).

@CaselIT
Copy link

CaselIT commented Jul 26, 2022

May make sense linking this to the pr for future reference

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment