Skip to content

Instantly share code, notes, and snippets.

@martinholub
Last active February 3, 2020 11:27
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save martinholub/9e58cdf86973ee0edbb8a82f076eb15d to your computer and use it in GitHub Desktop.
Save martinholub/9e58cdf86973ee0edbb8a82f076eb15d to your computer and use it in GitHub Desktop.
Downloading and validating files from ftp/http locations. Tested mainly with Ensembl.
for f in {input}
do
# mkdir --parents $f
[[ $f == *.gz ]] && gunzip --keep $f
done
def _is_gzipfile(self, fname):
"""Check if file is valid gzip file"""
with open(fname, "rb") as f:
return zipfile.binascii.hexlify(f.read(2)) == b"1f8b"
class CustomException(Exception):
def __init__(self, param=None, message="Default Message"):
if param:
ex_message = "Custom message {}".format(param)
else:
ex_message = message
super().__init__(ex_message)
def _ftp_callback(self, block, file_, progbar):
"""Write to file from ftp in blocks and update TDQM progress bar"""
file_.write(block)
progbar.update(len(block))
def _download_checker(self, fname):
"""Check if we need to download anything
"""
do_download = False
try: # overwrite bad trials
# 10000 is arbitrary, else would have to peek for size of remote file
do_download = os.path.getsize(fname) < 10000
except FileNotFoundError:
do_download = True
return do_download
def download_file(self, url, dirname, fname = None):
"""Download single file from url to specified folder"""
if not fname:
fname = list(filter(bool, url.split("/")))[-1]
fname_ = fname
fname = os.path.join(dirname, fname)
if self._download_checker(fname):
if url.startswith("http"):
# Download file from HTTP location
with requests.Session() as sess:
r = sess.get(url, stream = True)
with open(fname, "wb") as f:
print("Downloading {}".format(fname_))
tqdm_params = { 'unit': 'blocks',
'unit_scale': True,
'leave': False,
'miniters': 1,
'total': int(r.headers["Content-Length"])}
with TQDM(**tqdm_params) as tqdm: # progress bar
for chunk in r.iter_content(chunk_size = 512):
f.write(chunk)
tqdm.update(len(chunk))
# shutil.copyfileobj(r.raw, f)
elif url.startswith("ftp"):
# Download file from FTP server
url = url.replace("ftp://", "") # re.sub("^ftp://","",url)
fparts = list(filter(bool, url.split("/")))
ftp = ftplib.FTP(fparts[0]) # base remote address
ftp.login()
ftp.cwd(os.path.join(*fparts[1:-1]))
assert fparts[-1] in ftp.nlst(), "File {} not in remote location.".format(fparts[-1])
cmd = "RETR {}".format(fparts[-1])
with open(fname, "wb") as f:
print("Downloading {}".format(fparts[-1]))
tqdm_params = { 'unit': 'blocks',
'unit_scale': True,
'leave': False,
'miniters': 1,
'total': ftp.size(fparts[-1])}
with TQDM(**tqdm_params) as tqdm: # progress bar
ftp.retrbinary( cmd, lambda block: self._ftp_callback(block, f, tqdm),
blocksize = 512)
ftp.quit()
else:
print("File {} exists. Not overwritten.".format(fname))
return fname
def validate_file(self, fname):
"""Check if file is a valid GZIP file and convert if necessary"""
assert os.path.isfile(fname), "File {} does not exist.".format(fname)
assert os.path.getsize(fname) > 0, "File {} has size 0.".format(fname)
if not self._is_gzipfile(fname) and zipfile.is_zipfile(fname):
try:
fparts = list(filter(bool, fname.split("/")))
print("Repacking {} to valid GZIP archive...".format(fparts[-1]))
fdir = os.path.join(*fparts[:-1])
with zipfile.ZipFile(fname, 'r') as zf:
unzip_f_new = fparts[-1].replace(".gz","")
unzip_f = [x for x in zf.namelist() if any(a in x.lower() for a in ("some", "text"))]
if len(unzip_f) == 0:
unzip_f = unzip_f_new # Try the anticipated name
else:
if len(unzip_f) > 1:
print("Warning: Multiple files ({}) in archive may be valid.".format(unzip_f))
unzip_f = unzip_f[0]
zfpath = zf.extract(unzip_f, fdir)
os.remove(fname)
with open(zfpath, 'rb') as f_in:
with gzip.open(fname, 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(zfpath)
except Exception as e:
print("File {} didn't pass check.".format(fname))
raise e
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment