Skip to content

Instantly share code, notes, and snippets.

@pdemarti
Last active April 19, 2024 13:50
Show Gist options
  • Save pdemarti/a91da8538d630b0cfa48d42b3f41aa45 to your computer and use it in GitHub Desktop.
Save pdemarti/a91da8538d630b0cfa48d42b3f41aa45 to your computer and use it in GitHub Desktop.
Example (from a notebook) on how to exploit the concurrency inside boto3 for fast uploads of many files
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"id": "9941a015",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import tempfile\n",
"import shutil\n",
"import glob\n",
"\n",
"import numpy as np\n",
"import pandas as pd"
]
},
{
"cell_type": "code",
"execution_count": 2,
"id": "d6b17d0a",
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"'./junk/tmp_vegcw9b7'"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"junkdir = './junk'\n",
"os.makedirs(junkdir, exist_ok=True)\n",
"testdir = tempfile.mkdtemp(prefix='tmp_', dir=junkdir)\n",
"testdir"
]
},
{
"cell_type": "code",
"execution_count": 3,
"id": "329cc1f7",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"generated 500 files in ./junk/tmp_vegcw9b7; total size = 368.2MB\n"
]
}
],
"source": [
"# this is a bit slow: I am purposefully not using concurrency here, so as to not confuse the reader\n",
"for k in range(500):\n",
" name = os.path.join(testdir, f'foo-{k:03d}.csv')\n",
" pd.DataFrame(np.random.uniform(size=(200, 200))).to_csv(name)\n",
"\n",
"filelist = sorted(glob.glob(f'{testdir}/*.csv'))\n",
"sizelist = [os.stat(filename).st_size for filename in filelist]\n",
"totalsize = sum(sizelist)\n",
"print(f'generated {len(filelist)} files in {testdir}; total size = {totalsize/1024**2:.1f}MB')"
]
},
{
"cell_type": "code",
"execution_count": 4,
"id": "a76b12de",
"metadata": {},
"outputs": [],
"source": [
"import os\n",
"import boto3\n",
"import botocore\n",
"import boto3.s3.transfer as s3transfer\n",
"\n",
"def fast_upload(session, bucketname, s3dir, filelist, progress_func, workers=20):\n",
" botocore_config = botocore.config.Config(max_pool_connections=workers)\n",
" s3client = session.client('s3', config=botocore_config)\n",
" transfer_config = s3transfer.TransferConfig(\n",
" use_threads=True,\n",
" max_concurrency=workers,\n",
" )\n",
" s3t = s3transfer.create_transfer_manager(s3client, transfer_config)\n",
" for src in filelist:\n",
" dst = os.path.join(s3dir, os.path.basename(src))\n",
" s3t.upload(\n",
" src, bucketname, dst,\n",
" subscribers=[\n",
" s3transfer.ProgressCallbackInvoker(progress_func),\n",
" ],\n",
" )\n",
" s3t.shutdown() # wait for all the upload tasks to finish"
]
},
{
"cell_type": "code",
"execution_count": 5,
"id": "518febf0",
"metadata": {},
"outputs": [],
"source": [
"from tqdm import tqdm\n",
"\n",
"bucketname = 'XXXXXXXXXXXX'\n",
"s3dir = 'test/junk'\n",
"totalsize = sum([os.stat(f).st_size for f in filelist])"
]
},
{
"cell_type": "code",
"execution_count": 6,
"id": "1e937e24",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"upload: 100%|█████████████| 386M/386M [00:03<00:00, 117MB/s]"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 3.95 s, sys: 1.72 s, total: 5.67 s\n",
"Wall time: 3.3 s\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
"\n"
]
}
],
"source": [
"%%time\n",
"with tqdm(desc='upload', ncols=60, total=totalsize, unit='B', unit_scale=1) as pbar:\n",
" fast_upload(boto3.Session(), bucketname, s3dir, filelist, pbar.update)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "py39",
"language": "python",
"name": "py39"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.12"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment