Skip to content

Instantly share code, notes, and snippets.

@Artemis21
Created September 18, 2022 12:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Artemis21/f81dfcfc0928f6e154d00b929841d46f to your computer and use it in GitHub Desktop.
Save Artemis21/f81dfcfc0928f6e154d00b929841d46f to your computer and use it in GitHub Desktop.
Python aiohttp write directly to upload
from __future__ import annotations
import asyncio
import io
import aiohttp
import aiohttp.abc
class WriterPayload(aiohttp.Payload):
"""An aiohttp payload and stream writer, which allows writing directly to upload."""
def __init__(self):
"""Initialise the payload."""
super().__init__(value=None)
self.queue: asyncio.Queue[bytes | None] = asyncio.Queue()
async def write(self, writer: aiohttp.abc.AbstractStreamWriter) -> None:
"""Write recieved data to the upload stream."""
while True:
data = await self.queue.get()
if data is None:
break
await writer.write(data)
def writer(self) -> WriterPayloadWriter:
"""Get a write-able object which sends data to this payload."""
return WriterPayloadWriter(self)
class WriterPayloadWriter(io.BufferedWriter):
"""A write-able object which sends data to a WriterPayload."""
def __init__(self, payload: WriterPayload):
"""Initialise the writer."""
self.payload = payload
def write(self, data: object) -> int:
"""Write data to the payload."""
if not isinstance(data, bytes):
raise TypeError("Data must be bytes.")
self.payload.queue.put_nowait(data)
return len(data)
def close(self) -> None:
"""End the payload."""
self.payload.queue.put_nowait(None)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment