Last active
February 3, 2020 11:27
-
-
Save martinholub/9e58cdf86973ee0edbb8a82f076eb15d to your computer and use it in GitHub Desktop.
Downloading and validating files from ftp/http locations. Tested mainly with Ensembl.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
for f in {input} | |
do | |
# mkdir --parents $f | |
[[ $f == *.gz ]] && gunzip --keep $f | |
done |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def _is_gzipfile(self, fname): | |
"""Check if file is valid gzip file""" | |
with open(fname, "rb") as f: | |
return zipfile.binascii.hexlify(f.read(2)) == b"1f8b" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class CustomException(Exception): | |
def __init__(self, param=None, message="Default Message"): | |
if param: | |
ex_message = "Custom message {}".format(param) | |
else: | |
ex_message = message | |
super().__init__(ex_message) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def _ftp_callback(self, block, file_, progbar): | |
"""Write to file from ftp in blocks and update TDQM progress bar""" | |
file_.write(block) | |
progbar.update(len(block)) | |
def _download_checker(self, fname): | |
"""Check if we need to download anything | |
""" | |
do_download = False | |
try: # overwrite bad trials | |
# 10000 is arbitrary, else would have to peek for size of remote file | |
do_download = os.path.getsize(fname) < 10000 | |
except FileNotFoundError: | |
do_download = True | |
return do_download | |
def download_file(self, url, dirname, fname = None): | |
"""Download single file from url to specified folder""" | |
if not fname: | |
fname = list(filter(bool, url.split("/")))[-1] | |
fname_ = fname | |
fname = os.path.join(dirname, fname) | |
if self._download_checker(fname): | |
if url.startswith("http"): | |
# Download file from HTTP location | |
with requests.Session() as sess: | |
r = sess.get(url, stream = True) | |
with open(fname, "wb") as f: | |
print("Downloading {}".format(fname_)) | |
tqdm_params = { 'unit': 'blocks', | |
'unit_scale': True, | |
'leave': False, | |
'miniters': 1, | |
'total': int(r.headers["Content-Length"])} | |
with TQDM(**tqdm_params) as tqdm: # progress bar | |
for chunk in r.iter_content(chunk_size = 512): | |
f.write(chunk) | |
tqdm.update(len(chunk)) | |
# shutil.copyfileobj(r.raw, f) | |
elif url.startswith("ftp"): | |
# Download file from FTP server | |
url = url.replace("ftp://", "") # re.sub("^ftp://","",url) | |
fparts = list(filter(bool, url.split("/"))) | |
ftp = ftplib.FTP(fparts[0]) # base remote address | |
ftp.login() | |
ftp.cwd(os.path.join(*fparts[1:-1])) | |
assert fparts[-1] in ftp.nlst(), "File {} not in remote location.".format(fparts[-1]) | |
cmd = "RETR {}".format(fparts[-1]) | |
with open(fname, "wb") as f: | |
print("Downloading {}".format(fparts[-1])) | |
tqdm_params = { 'unit': 'blocks', | |
'unit_scale': True, | |
'leave': False, | |
'miniters': 1, | |
'total': ftp.size(fparts[-1])} | |
with TQDM(**tqdm_params) as tqdm: # progress bar | |
ftp.retrbinary( cmd, lambda block: self._ftp_callback(block, f, tqdm), | |
blocksize = 512) | |
ftp.quit() | |
else: | |
print("File {} exists. Not overwritten.".format(fname)) | |
return fname |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def validate_file(self, fname): | |
"""Check if file is a valid GZIP file and convert if necessary""" | |
assert os.path.isfile(fname), "File {} does not exist.".format(fname) | |
assert os.path.getsize(fname) > 0, "File {} has size 0.".format(fname) | |
if not self._is_gzipfile(fname) and zipfile.is_zipfile(fname): | |
try: | |
fparts = list(filter(bool, fname.split("/"))) | |
print("Repacking {} to valid GZIP archive...".format(fparts[-1])) | |
fdir = os.path.join(*fparts[:-1]) | |
with zipfile.ZipFile(fname, 'r') as zf: | |
unzip_f_new = fparts[-1].replace(".gz","") | |
unzip_f = [x for x in zf.namelist() if any(a in x.lower() for a in ("some", "text"))] | |
if len(unzip_f) == 0: | |
unzip_f = unzip_f_new # Try the anticipated name | |
else: | |
if len(unzip_f) > 1: | |
print("Warning: Multiple files ({}) in archive may be valid.".format(unzip_f)) | |
unzip_f = unzip_f[0] | |
zfpath = zf.extract(unzip_f, fdir) | |
os.remove(fname) | |
with open(zfpath, 'rb') as f_in: | |
with gzip.open(fname, 'wb') as f_out: | |
shutil.copyfileobj(f_in, f_out) | |
os.remove(zfpath) | |
except Exception as e: | |
print("File {} didn't pass check.".format(fname)) | |
raise e |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment