Skip to content

Instantly share code, notes, and snippets.

@Object905
Created July 13, 2021 16:56
Show Gist options
  • Save Object905/65dc5a24c58ec5e3a401027ee6e09687 to your computer and use it in GitHub Desktop.
Save Object905/65dc5a24c58ec5e3a401027ee6e09687 to your computer and use it in GitHub Desktop.
from asyncio import Semaphore
from io import DEFAULT_BUFFER_SIZE
from os import PathLike
import aiofiles
import aiohttp
from zipstream import AioZipStream
async def download_files_into_zip(
url_to_zip_name: dict[str, str],
zip_dest: PathLike,
max_tasks=5,
chunk_size: int = DEFAULT_BUFFER_SIZE,
):
zip_files_generators = []
sema = Semaphore(max_tasks)
zip_file = await aiofiles.open(zip_dest, "wb")
async with aiohttp.ClientSession() as session:
for url, zip_name in url_to_zip_name.items():
zip_files_generators.append(
{
"stream": file_stream(url, session, sema, chunk_size),
"name": zip_name,
}
)
aiozip = AioZipStream(zip_files_generators, chunksize=chunk_size)
async for chunk in aiozip.stream():
await zip_file.write(chunk)
await zip_file.close()
async def file_stream(
url: str,
session: aiohttp.ClientSession,
sema: Semaphore,
chunk_size: int = DEFAULT_BUFFER_SIZE,
):
async with sema:
async with session.get(url) as resp:
if resp.status != 200:
print(f"Failed to download {url}. Response: {resp}.")
return
while True:
chunk = await resp.content.read(chunk_size)
if not chunk:
return
yield chunk
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment