Skip to content

Instantly share code, notes, and snippets.

@movalex
Last active January 3, 2024 21:40
Show Gist options
  • Save movalex/74e5d3b08df51fe17559eedb0822bbcf to your computer and use it in GitHub Desktop.
Save movalex/74e5d3b08df51fe17559eedb0822bbcf to your computer and use it in GitHub Desktop.
Download CloudApp Data
import argparse
import os
import pandas as pd
import requests
from requests.adapters import HTTPAdapter, Retry
import concurrent.futures
from pathlib import Path
from tqdm import tqdm
from datetime import datetime
def process_headers(headers):
"""Process and return relevant information from the response headers."""
total_length = int(headers.get("content-length", 0))
last_modified = headers.get("Last-Modified")
last_modified_timestamp = None
if last_modified:
last_modified_date = datetime.strptime(
last_modified, "%a, %d %b %Y %H:%M:%S GMT"
)
last_modified_timestamp = last_modified_date.timestamp()
return total_length, last_modified_timestamp
def setup_progress_bar(total_length, file_name):
"""Setup and return a tqdm progress bar."""
return tqdm(total=total_length, unit="B", unit_scale=True, desc=file_name)
def create_retry_session(
retries=3,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
method_whitelist=["HEAD", "GET", "OPTIONS"],
):
"""Create a requests.Session with retry strategy.
Currently not used"""
retry_strategy = Retry(
total=retries,
backoff_factor=backoff_factor,
status_forcelist=status_forcelist,
method_whitelist=method_whitelist,
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session = requests.Session()
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
def download_file(file_url: str, file_path: Path):
"""Download a file with a progress bar and set its last modified time."""
# print(f"Attempting to download: {file_url} to {file_path}")
if pd.isna(file_url):
return f"Skipped (no URL found): {file_path.name}"
if file_path.exists():
return f"Skipped (exists): {file_path.name}"
# session = create_retry_session()
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3"
}
try:
with requests.get(
file_url, stream=True, headers=headers
) as r: # using the session with a timeout
r.raise_for_status()
total_length, last_modified_timestamp = process_headers(r.headers)
with open(file_path, "wb") as f, setup_progress_bar(
total_length, file_path.name
) as bar:
for chunk in r.iter_content(chunk_size=1024):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
bar.update(len(chunk))
if last_modified_timestamp is not None:
os.utime(file_path, (last_modified_timestamp, last_modified_timestamp))
return f"Downloaded {file_path}"
except Exception as e:
error_msg = f"Error downloading {file_url}: {e}"
print(error_msg)
return error_msg
def safe_file_name(file_path, max_length=255):
"""Truncate the file name to a safe length."""
directory, file_name = os.path.split(file_path)
if len(file_name) > max_length:
# Preserve the file extension
extension = os.path.splitext(file_name)[1]
# Truncate the file name and append the extension
file_name = file_name[: max_length - len(extension)] + extension
return os.path.join(directory, file_name)
def prepare_download_tasks(df, files_dir):
# Prepare the list of files to download
download_tasks = []
for _, row in df.iterrows():
file_url = row["remote_url"]
file_name = row["name"]
private_slug = (
row["private_slug"] if not pd.isna(row["private_slug"]) else "no_slug"
)
file_path = files_dir / f"{private_slug}_{file_name}"
safe_path = Path(safe_file_name(str(file_path)))
download_tasks.append((file_url, safe_path))
return download_tasks
def perform_downloads(download_tasks, max_workers):
"""Download files using a thread pool and return the results."""
results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_url = {
executor.submit(download_file, url, path): url
for url, path in download_tasks
}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
results.append(result)
except Exception as e:
results.append(f"Error downloading {url}: {e}")
return results
def main(csv_path, files_dir_path, output_csv_path, max_workers=5):
df = pd.read_csv(csv_path)
files_dir = Path(files_dir_path)
files_dir.mkdir(parents=True, exist_ok=True)
download_tasks = prepare_download_tasks(df, files_dir)
results = perform_downloads(download_tasks, max_workers)
df["download_status"] = results
df.to_csv(output_csv_path, index=False)
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser(
description="Download files from a CSV file with multi-threading."
)
parser.add_argument(
"csv_path",
type=str,
help="Path to the CSV file containing the file URLs and names.",
)
parser.add_argument(
"files_dir", type=str, help="Directory path where files will be saved."
)
parser.add_argument(
"output_csv_path",
type=str,
help="Path to save the CSV file with download statuses.",
)
parser.add_argument(
"--max_workers",
type=int,
default=5,
help="Maximum number of worker threads to use for downloading files.",
)
args = parser.parse_args()
main(args.csv_path, args.files_dir, args.output_csv_path, args.max_workers)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment