Skip to content

Instantly share code, notes, and snippets.

@thevickypedia
Created January 18, 2024 12:10
Show Gist options
  • Save thevickypedia/aa1c1111443b582684e86b0951f64a28 to your computer and use it in GitHub Desktop.
Save thevickypedia/aa1c1111443b582684e86b0951f64a28 to your computer and use it in GitHub Desktop.
File downloader using multi-threading with progress bar
import math
import os
import time
from threading import Thread
from typing import Union
import inflect
import requests
import tqdm
ENGINE = inflect.engine()
def number_to_words(input_: Union[int, str], capitalize: bool = False) -> str:
"""Converts integer version of a number into words.
Args:
input_: Takes the integer version of a number as an argument.
capitalize: Boolean flag to capitalize the first letter.
Returns:
str:
String version of the number.
"""
result = ENGINE.number_to_words(num=input_)
return result[0].upper() + result[1:] if capitalize else result
def pluralize(count: int, word: str, to_words: bool = False, cap_word: bool = False) -> str:
"""Helper for ``time_converter`` function.
Args:
count: Number based on which plural form should be determined.
word: Word for which the plural form should be converted.
to_words: Boolean flag to convert numeric to words in the response string.
cap_word: If to_words is passed as True, then analyzes whether the first letter should be capitalized.
Returns:
str:
String formatted time in singular or plural.
"""
if to_words:
return f"{number_to_words(input_=count, capitalize=cap_word)} {ENGINE.plural(text=word, count=count)}"
return f"{count} {ENGINE.plural(text=word, count=count)}"
def time_converter(second: float) -> str:
"""Modifies seconds to appropriate days/hours/minutes/seconds.
Args:
second: Takes number of seconds as argument.
Returns:
str:
Seconds converted to days or hours or minutes or seconds.
"""
day = round(second // 86400)
second = round(second % (24 * 3600))
hour = round(second // 3600)
second %= 3600
minute = round(second // 60)
second %= 60
pluralize.counter = -1
if day and hour and minute and second:
return f"{pluralize(day, 'day')}, {pluralize(hour, 'hour')}, " \
f"{pluralize(minute, 'minute')}, and {pluralize(second, 'second')}"
elif day and hour and minute:
return f"{pluralize(day, 'day')}, {pluralize(hour, 'hour')}, " \
f"and {pluralize(minute, 'minute')}"
elif day and hour:
return f"{pluralize(day, 'day')}, and {pluralize(hour, 'hour')}"
elif day:
return pluralize(day, 'day')
elif hour and minute and second:
return f"{pluralize(hour, 'hour')}, {pluralize(minute, 'minute')}, and {pluralize(second, 'second')}"
elif hour and minute:
return f"{pluralize(hour, 'hour')}, and {pluralize(minute, 'minute')}"
elif hour:
return pluralize(hour, 'hour')
elif minute and second:
return f"{pluralize(minute, 'minute')}, and {pluralize(second, 'second')}"
elif minute:
return pluralize(minute, 'minute')
else:
return pluralize(second, 'second')
def size_converter(byte_size: int) -> str:
"""Gets the current memory consumed and converts it to human friendly format.
Args:
byte_size: Receives byte size as argument.
Returns:
str:
Converted understandable size.
"""
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
index = int(math.floor(math.log(byte_size, 1024)))
return f"{round(byte_size / pow(1024, index), 2)} {size_name[index]}"
def download_chunk(url: str, start_byte: int, end_byte: int, output_file: str, pbar: tqdm.tqdm) -> None:
"""Download chunk of data, triggered individually by threads.
Args:
url: URL to request data.
start_byte: Starting byte for each thread.
end_byte: Ending byte for each thread.
output_file: Filename to write the downloaded data.
pbar: Progress bar to update the status of each thread.
"""
headers = {'Range': f'bytes={start_byte}-{end_byte}'}
response = requests.get(url, headers=headers, stream=True)
with open(output_file, 'r+b') as file:
file.seek(start_byte)
file.write(response.content)
pbar.update(end_byte - start_byte + 1)
def download_file(url: str, num_threads: int = os.cpu_count()) -> None:
"""Get the file size and initiate download in chunks.
Args:
url: URL to request data.
num_threads: Number of threads to spin up.
"""
start = time.time()
output_file = url.split("/")[-1]
response = requests.head(url)
file_size = int(response.headers['Content-Length'])
print(f"File size for {output_file}: {size_converter(file_size)}")
chunk_size = file_size // num_threads
with open(output_file, 'wb') as file:
file.write(b'\0' * file_size)
print(f"Downloading {output_file!r} using {num_threads} threads with chunk size: "
f"{size_converter(chunk_size)} on each thread")
pbar = tqdm.tqdm(desc=f"Download progress", total=file_size, unit='B', unit_scale=True, leave=False)
threads = []
for i in range(num_threads):
start_byte = i * chunk_size
end_byte = start_byte + chunk_size - 1
if i == num_threads - 1:
end_byte = file_size
thread = Thread(target=download_chunk, args=(url, start_byte, end_byte, output_file, pbar))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
pbar.close()
print(f"Download complete in {time_converter(time.time() - start)}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment