Skip to content

Instantly share code, notes, and snippets.

@regtm
Created September 18, 2022 19:48
Show Gist options
  • Save regtm/5be7337561215c4e107b393312a17f2e to your computer and use it in GitHub Desktop.
Save regtm/5be7337561215c4e107b393312a17f2e to your computer and use it in GitHub Desktop.
Dload ipython quick fix
"""
dload - Download Library
A python library to simplify your download tasks.
~~~~~~~~~~~~~~~~~~~~~
"""
import os, sys, time
import re
from contextlib import closing
from shutil import copyfileobj
import urllib.request as request
import zipfile
def check_installation(rv):
"""
checks if current python version is correct to run this module
:param rv: string - python version, i.e. check_installation("36")
:return: boolean
"""
current_version = sys.version_info
if current_version.major == int(rv[0]) and current_version.minor >= int(rv[1]):
return True
else:
sys.stderr.write( "[%s] - Error: Your Python interpreter must be %s.%s or greater (within major version %s)\n" % (sys.argv[0], rv[0], rv[1], rv[0]) )
sys.exit(-1)
return False
check_installation("36")
import traceback, io
from urllib.parse import urlparse
try:
import requests
except Exception:
print ("'requests' is required.")
def bytes(url):
"""
Returns the remote file as a byte obj
:param url: str - url to download
:return: bytes
"""
try:
return requests.get(url).content
except:
pass
print(traceback.print_exc())
return b""
def rand_fn():
"""
provides a random filename when it's impossible to determine the filename, i.e.: http://site.tld/dir/
:return: str
"""
return str(int(time.time()))[:5]
def get_default_path():
"""
Provides call path with fallbacks
:return: str - The default save path
"""
namespace = sys._getframe(1).f_globals # caller's globals
if hasattr(namespace, '__file__'): # standard case
c_path = os.path.dirname(namespace['__file__'])
elif hasattr(globals(), '_dh'): # ipython environment
c_path = os.path.dirname(globals()['_dh'][0])
else:
c_path = os.getcwd() # fallback
return c_path
def save(url, path="", overwrite=False):
"""
Download and save a remote file
:param url: str - file url to download
:param path: str - (optional) Full path to save the file, ex: c:/test.txt or /home/test.txt.
Defaults to script location and url filename
:param overwrite: bool - (optional) If True the local file will be overwritten, False will skip the download
:return: str - The full path of the downloaded file or an empty string
"""
try:
c_path = get_default_path()
fn = os.path.basename(urlparse(url).path)
fn = fn if fn else f"dload{rand_fn()}"
path = path if path.strip() else c_path+os.path.sep+fn
if not overwrite and os.path.isfile(path):
return path
r = requests.get(url)
with open(path, 'wb') as f:
f.write(r.content)
return path
except:
pass
print(traceback.print_exc())
return ""
def text(url, encoding=""):
"""
Returns the remote file as a string
:param url: str - url to retrieve the text content
:param encoding: str - (optional) character encoding
:return: str
"""
try:
r = requests.get(url)
if encoding:
r.encoding = encoding
return r.text
except:
print(traceback.print_exc())
pass
return ""
def json(url):
"""
Returns the remote file as a dict
:param url: str - url to retrieve the json
:return: dict
"""
try:
return requests.get(url).json()
except:
print(traceback.print_exc())
pass
return {}
def headers(url, redirect=True):
"""
Returns the reply headers as a dict
:param url: str - url to retrieve the reply headers
:param redirect: boolean - (optional) should we follow redirects?
:return: dict
"""
try:
return dict(requests.head(url, allow_redirects=redirect).headers)
except:
print(traceback.print_exc())
pass
return {}
def ftp(ftp_url, local_path="", overwrite=False):
"""
Download and save an FTP file
:param url: str - ftp://ftp.server.tld/path/to/file.ext or ftp://username:password@ftp.server.tld/path/to/file.ext
:param localpath: str - (optional) local path to save the file, i.e.: /home/myfile.ext or c:/myfile.ext
:param overwrite: bool - (optional) If True the local file will be overwritten, False will skip the download
:return: str - local path of the downloaded file
"""
try:
c_path = get_default_path()
fn = os.path.basename(urlparse(ftp_url).path)
fn = fn if fn else f"dload{rand_fn()}"
local_path = local_path if local_path.strip() else c_path+os.path.sep+fn
if not overwrite and os.path.isfile(local_path):
return local_path
with closing(request.urlopen(ftp_url)) as r:
with open(local_path, 'wb') as f:
copyfileobj(r, f)
return local_path
except:
print(traceback.print_exc())
pass
return ""
def save_multi(url_list, dir="", max_threads=1, tsleep=0.05):
"""
Multi threaded file downloader
:param url_list: str or list - A python list or a path to a text file containing the urls to be downloaded
:param dir: str - (optional) Directory to save the files, will be created if it doesn't exist
:param max_threads: int - (optional) Max number of parallel downloads
:param tsleep: int or float - (optional) time to sleep in seconds when the max_threads value is reached, i.e: 0.05 or 1 is accepted
:return: boolean
"""
import threading
from time import sleep
try:
if not isinstance(url_list, list):
with open(url_list) as f:
url_list = [x.rstrip() for x in f if x]
if dir:
if not os.path.exists(dir):
from pathlib import Path
Path(dir).mkdir(parents=True, exist_ok=True)
for url in url_list:
if dir:
fn = os.path.basename(urlparse(url).path)
fp = f"{dir}/{fn}"
args = [url, fp]
else:
args = [url]
threading.Thread(target=save, args=args, name="dload" ).start()
dload_threads = [x.getName() for x in threading.enumerate() if "dload" == x.getName()]
while len(dload_threads) >= max_threads:
dload_threads = [x.getName() for x in threading.enumerate() if "dload" == x.getName()]
sleep(tsleep)
while dload_threads:
dload_threads = [x.getName() for x in threading.enumerate() if "dload" == x.getName()]
sleep(tsleep)
return True
except:
pass
print(traceback.print_exc())
return False
def down_speed(size=5, ipv="ipv4", port=80):
"""
Measures the download speed
:param size: int - (optional) 5, 10, 20, 50, 100, 200, 512, 1024 Mb
:param ipv: str - (optional) ipv4, ipv6
:param port: int - (optional) 80, 81, 8080
:return: boolean
"""
if size == 1024:
size = "1GB"
else:
size = f"{size}MB"
url = f"http://{ipv}.download.thinkbroadband.com:{port}/{size}.zip"
with io.BytesIO() as f:
start = time.clock()
r = requests.get(url, stream=True)
total_length = r.headers.get('content-length')
dl = 0
if total_length is None: # no content length header
f.write(r.content)
else:
for chunk in r.iter_content(1024):
dl += len(chunk)
f.write(chunk)
done = int(30 * dl / int(total_length))
sys.stdout.write("\r[%s%s] %s Mbps" % ('=' * done, ' ' * (30-done), dl//(time.clock() - start) / 100000))
print( f"\n{size} = {(time.clock() - start):.2f} seconds")
def save_unzip(zip_url, extract_path="", delete_after=False):
"""
Save and Extract a remote zip
:param zip_url: str - the zip file url to download
:param extract_path: str - (optional) the path to extract the zip file, defaults to local dir
:param delete_after: bool - (optional) if the zip file should be deleted after, defaults to False
:return: str - the extract path or an empty string
"""
try:
c_path = get_default_path()
fn = os.path.basename(urlparse(zip_url).path)
fn = fn if fn.strip() else f"dload{rand_fn()}"
zip_path = save(zip_url, f"{c_path}/{fn}")
folder = os.path.splitext(fn)[0]
extract_path = extract_path if extract_path.strip() else c_path+os.path.sep+folder
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_path)
if delete_after and os.path.isfile(zip_path):
os.remove(zip_path)
return extract_path
except:
pass
print(traceback.print_exc())
return ""
def git_clone(git_url, clone_dir=""):
"""
Clones a git repo to local computer
:param git_url: str - git url, ex: https://github.com/x011/dload.git
:param clone_dir: str - (optional) local dir to clone the git, ex: /path/to/dload/ or c:/repos/dload/, defaults to repo name on script dir
:return: str - path to local repo dir or an empty tring
"""
git_url = git_url.strip()
if not git_url.lower().endswith(".git"):
print("Invalid git_url")
return ""
try:
repo_name = re.sub("\.git$", "", git_url, 0, re.IGNORECASE | re.MULTILINE)
repo_zip = repo_name + "/archive/master.zip"
if not clone_dir:
repo_name = repo_name.split("/")[-1]
c_path = get_default_path()
folder = os.path.splitext(c_path)[0]
clone_dir = f"{folder}/{repo_name}"
else:
if not re.search(r"/|\\$", clone_dir, re.IGNORECASE | re.MULTILINE):
print("Invalid clone_dir")
return ""
if os.path.isfile("master.zip"):
os.remove("master.zip")
return save_unzip(repo_zip, clone_dir, delete_after=True)
except:
pass
print(traceback.print_exc())
return ""
@BradKML
Copy link

BradKML commented Sep 23, 2022

Currently using this to bodge https://github.com/jayolson/divergent-association-task
jayolson/divergent-association-task#1 => x011/dload#4

Turning into

model_file = save_unzip('http://nlp.stanford.edu/data/glove.840B.300d.zip', get_default_path()+'/model.txt')
nouns = save('https://raw.githubusercontent.com/jayolson/divergent-association-task/main/words.txt', get_default_path()+'/words.txt')
model = Model(model_file, nouns)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment