Skip to content

Instantly share code, notes, and snippets.

@joshuarobinson
Last active October 26, 2022 03:05
Show Gist options
  • Save joshuarobinson/708b44180a36efb7485ff5f9c4e8bd0f to your computer and use it in GitHub Desktop.
Save joshuarobinson/708b44180a36efb7485ff5f9c4e8bd0f to your computer and use it in GitHub Desktop.
Working PySpark notebook to retrieve imagenet URL list and parallelize downloads.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This notebook is a best-effort downloader for the original Imagenet sources from fall 2011. Many of them are not available anymore, so this script does not try to hard to be comprehensive.\n",
"\n",
"For more information about the source, see [this page](http://image-net.org/download-imageurls)."
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import io\n",
"import shutil\n",
"import tarfile\n",
"import urllib"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"URLSOURCE = \"http://image-net.org/imagenet_data/urls/imagenet_fall11_urls.tgz\""
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"# Download the list of URLs, untar, and decode into a list of byte strings.\n",
"urldata = urllib.request.urlopen(URLSOURCE)\n",
"tar = tarfile.open(mode= \"r:gz\", fileobj = io.BytesIO(urldata.read()))\n",
"filesrc = [tuple(line.decode('cp437').split()) for line in tar.extractfile(\"fall11_urls.txt\").readlines()]"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [],
"source": [
"# Remove the query string, parameters, and fragments, might break some downloads but gives clean path names that can be manipulated.\n",
"def stripUrl(original):\n",
" parsed = urllib.parse.urlparse(original)\n",
" return parsed.scheme + '://' + parsed.netloc + parsed.path"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"# Convert list of URLs to a dataframe with two columns, skip malformed lines.\n",
"listing = sc.parallelize(filesrc, 2400).filter(lambda r: len(r) == 2)\n",
"listingdf = listing.map(lambda x: (x[0], stripUrl(x[1]))).toDF(['id', 'url'])"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"# Counters.\n",
"dl_success = sc.accumulator(0)\n",
"dl_fail = sc.accumulator(0)\n",
"dl_skip = sc.accumulator(0)"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"def downloader(row, outpath):\n",
" # Combine the 'id' and url to create the output filename: id + file extension.\n",
" urlpath = urllib.parse.urlparse(row[\"url\"]).path\n",
" _, ext = os.path.splitext(urlpath)\n",
" file_name = os.path.join(outpath, row[\"id\"] + ext)\n",
"\n",
" # Check if the file has already been downloaded\n",
" if os.path.exists(file_name):\n",
" dl_skip.add(1)\n",
" return\n",
" try:\n",
" # Download and write to file.\n",
" with urllib.request.urlopen(row[\"url\"], timeout=5) as urldata, open(file_name, 'wb') as out_file:\n",
" shutil.copyfileobj(urldata, out_file)\n",
" dl_success.add(1)\n",
" except:\n",
" dl_fail.add(1)"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"# Set the destination to save all downloaded files, create directory if not there already.\n",
"OUTPATH = \"/datahub/imagenet_download/data\"\n",
"os.makedirs(OUTPATH, exist_ok=True)"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"CPU times: user 2.92 s, sys: 480 ms, total: 3.4 s\n",
"Wall time: 8h 39min 24s\n"
]
},
{
"data": {
"text/plain": [
"[7072212, 1205574, 5918571]"
]
},
"execution_count": 12,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# Do the actual downloading work.\n",
"%time listingdf.foreach(lambda r: downloader(r, OUTPATH))\n",
"[dl_success.value, dl_skip.value, dl_fail.value]"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment