Skip to content

Instantly share code, notes, and snippets.

@joshuarobinson
Last active April 25, 2019 10:15
Show Gist options
  • Save joshuarobinson/9c5f84ed276e947cad564f0f366bb73e to your computer and use it in GitHub Desktop.
Save joshuarobinson/9c5f84ed276e947cad564f0f366bb73e to your computer and use it in GitHub Desktop.
Trivial example to illustrate how to use Spark to parallelize URL downloads.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [],
"source": [
"import shutil\n",
"import urllib"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [],
"source": [
"# Test dataset, old versions of the linux kernel.\n",
"urls = ['https://mirrors.edge.kernel.org/pub/linux/kernel/v2.4/linux-2.4.3.tar.gz',\n",
" 'https://mirrors.edge.kernel.org/pub/linux/kernel/v2.4/linux-2.4.30.tar.gz',\n",
" 'https://mirrors.edge.kernel.org/pub/linux/kernel/v2.4/linux-2.4.31.tar.gz',\n",
" 'https://mirrors.edge.kernel.org/pub/linux/kernel/v2.4/linux-2.4.32.tar.gz',\n",
" 'https://mirrors.edge.kernel.org/pub/linux/kernel/v2.4/linux-2.4.33.1.tar.gz']"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [],
"source": [
"# Convert URL list to an RDD in order to distribute to workers.\n",
"# Makes a much bigger difference if len(urls) is large.\n",
"listing = sc.parallelize(urls)"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [],
"source": [
"# Download URL and save to outpath.\n",
"def downloader(url, outpath):\n",
" # Pull out of URL to construct the destination path and filename.\n",
" file_name = os.path.basename(urllib.parse.urlparse(url).path)\n",
" file_path = os.path.join(outpath, file_name)\n",
"\n",
" # Check if the file has already been downloaded.\n",
" if os.path.exists(file_path):\n",
" return\n",
" \n",
" # Download and write to file.\n",
" with urllib.request.urlopen(url, timeout=5) as urldata, open(file_path, 'wb') as out_file:\n",
" shutil.copyfileobj(urldata, out_file) "
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [],
"source": [
"# Set the destination to save all downloaded files, create directory if not there already.\n",
"OUTPATH = \"/datahub/kernels\"\n",
"os.makedirs(OUTPATH, exist_ok=True)"
]
},
{
"cell_type": "code",
"execution_count": 24,
"metadata": {},
"outputs": [],
"source": [
"# Kick off the job.\n",
"listing.foreach(lambda url: downloader(url, OUTPATH))"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.5.3"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment