Skip to content

Instantly share code, notes, and snippets.

@vytas7
Created July 26, 2022 08:12
Show Gist options
  • Save vytas7/4ab1317455318db05b309dae75ab1662 to your computer and use it in GitHub Desktop.
Save vytas7/4ab1317455318db05b309dae75ab1662 to your computer and use it in GitHub Desktop.
Upload files from a multipart form directly to S3 (Falcon ASGI)
import logging
import aioboto3
import falcon.asgi
logging.basicConfig(
format='%(asctime)s [%(levelname)s] %(message)s', level=logging.INFO
)
class Upload:
def __init__(self):
self._session = aioboto3.Session()
async def on_post(self, req, resp):
form = await req.get_media()
async with self._session.client(
service_name='s3',
aws_access_key_id='test:tester',
aws_secret_access_key='testing',
endpoint_url='http://localhost:8080',
) as s3:
async for part in form:
if part.filename:
await s3.upload_fileobj(
Bucket='bucket',
Key=f'uploads/{part.secure_filename}',
Fileobj=part.stream,
)
app = falcon.asgi.App()
app.add_route('/uploads', Upload())
@CaselIT
Copy link

CaselIT commented Jul 26, 2022

May make sense linking this to the pr for future reference

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment