Skip to content

Instantly share code, notes, and snippets.

@stefanfortuin
Created November 5, 2020 10:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save stefanfortuin/9dbfe8618701507d0ef2b5515b165c5f to your computer and use it in GitHub Desktop.
Save stefanfortuin/9dbfe8618701507d0ef2b5515b165c5f to your computer and use it in GitHub Desktop.
A multithreaded file downloader in python using semaphores, used for webscraping
import os, sys, threading
import requests
class FileDownloader():
def __init__(self, max_threads=10):
self.sema = threading.Semaphore(value=max_threads)
self.headers = {'user-agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36'}
self.block_size = 1024
def t_getfile(self, link, filename, session):
"""
Threaded function that uses a semaphore
to not instantiate too many threads
"""
self.sema.acquire()
filepath = os.path.join(os.getcwd() + '/Downloads/' + str(filename))
os.makedirs(os.path.dirname(filepath), exist_ok=True)
if not os.path.isfile(filepath):
self.download_new_file(link, filepath, session)
else:
current_bytes = os.stat(filepath).st_size
headers = requests.head(link).headers
if 'content-length' not in headers:
print(f"server doesn't support content-length for {link}")
self.sema.release()
return
total_bytes = int(requests.head(link).headers['content-length'])
if current_bytes < total_bytes:
self.continue_file_download(link, filepath, session, current_bytes, total_bytes)
else:
print(f"already done: {filename}")
self.sema.release()
def download_new_file(self, link, filepath, session):
print(f"downloading: {filepath}")
if session == None:
try:
request = requests.get(link, headers=self.headers, timeout=30, stream=True)
self.write_file(request, filepath, 'wb')
except requests.exceptions.RequestException as e:
print(e)
else:
request = session.get(link, stream=True)
self.write_file(request ,filepath, 'wb')
def continue_file_download(self, link, filepath, current_bytes, total_bytes):
print(f"resuming: {filepath}")
range_header = self.headers.copy()
range_header['Range'] = f"bytes={current_bytes}-{total_bytes}"
try:
request = requests.get(link, headers=range_header, timeout=30, stream=True)
self.write_file(request, filepath, 'ab')
except requests.exceptions.RequestException as e:
print(e)
def write_file(self, content, filepath, writemode):
with open(filepath, writemode) as f:
for chunk in content.iter_content(chunk_size=self.block_size):
if chunk:
f.write(chunk)
print(f"completed file {filepath}", end='\n')
f.close()
def get_file(self, link, filename, session=None):
""" Downloads the file"""
thread = threading.Thread(target=self.t_getfile, args=(link, filename, session))
thread.start()
@jalotra
Copy link

jalotra commented Nov 18, 2021

So if only one thread can write to the file one time(using semaphore), how exactly is it multithreaded?

@chazp246
Copy link

Yeah i am thinking the same thing....

So if only one thread can write to the file one time(using semaphore), how exactly is it multithreaded?

@jalotra
Copy link

jalotra commented Nov 22, 2021

Inserting my single threaded version of above script :

import os
import requests

from tqdm import tqdm
import time


class HttpDownloader(object):
    def __init__(self, url):
        self.url = url
        self.headers = {'user-agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/62.0.3202.94 Safari/537.36'}
        self.block_size = 1024
        try:
            self.total_bytes = int(requests.head(self.url).headers['content-length'])
            print(self.total_bytes)
        except KeyError:
            raise Exception('Cannot found content-length in headers')

    def get_file(self, filename) -> dict:
        "Use this to get url using requests"

        start_time = time.time()
        
        filepath = os.path.join(os.getcwd() + '/Downloads/' + str(filename))
        os.makedirs(os.path.dirname(filepath), exist_ok=True)

        if not os.path.isfile(filepath):
            self.download_new_file(filepath)
        else:
            current_bytes = os.stat(filepath).st_size
            headers = requests.head(self.url).headers
            if 'content-length' not in headers:
                print(f"server doesn't support content-length for {self.url}")
                return {
                    "passed" : False,
                    "time_taken_seconds" : round(time.time() - start_time, 2),
                    "saved_path" : None
                }
            if current_bytes < self.total_bytes:
                self.continue_file_download(filepath, current_bytes)
            else:
                print(f"already done: {filename}")
        
        return {
            "passed" : True, 
            "time_taken_seconds": round(time.time() - start_time, 2),
            "saved_path" : filepath
        }

    def download_new_file(self, filepath):
        print(f"downloading: {filepath}")
        try:
            self.total_bytes = int()
            request = requests.get(self.url, headers=self.headers, timeout=30, stream=True)
            self.write_file(request, filepath, 'wb', self.total_bytes)
        except requests.exceptions.RequestException as e:
            raise e
    
    def continue_file_download(self,  filepath, current_bytes):
        print(f"resuming: {filepath}")
        range_header = self.headers.copy()
        range_header['Range'] = f"bytes={current_bytes}-{self.total_bytes}"

        try:
            request = requests.get(self.url, headers=range_header, timeout=30, stream=True)
            self.write_file(request, filepath, 'wb', self.total_bytes - current_bytes)
        except requests.exceptions.RequestException as e:
            raise e
    
    def write_file(self, content, filepath, writemode, total_bytes):
        
        with open(filepath, writemode) as file_obj:
            with tqdm.wrapattr(file_obj, "write", total_bytes) as f:
                for data in content.iter_content(self.block_size):
                    f.write(data)

    # Returns {res_code : "bool", time_taken : "float", saved_file_path : "Path"}
    def download(self, filename : str) -> dict:
        return self.get_file(filename)


if __name__ == '__main__':
    obj = HttpDownloader(url = "https://alex.smola.org/drafts/thebook.pdf")
    print(obj.download("machine-learning_book.pdf"))

@stefanfortuin
Copy link
Author

hmm yeah, i could have worded it better.

So it isn't downloading a file multithreaded.
It is downloading multiple files asynchronously. So you can download multiple files at once.

@jalotra
Copy link

jalotra commented Nov 30, 2021

@stefanfortuin Got you but you didn't join threads to main thread, something like threads[i].join()

On another note (Python threads is bad at CPU Bound operations) : LINK

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment