Last active
July 2, 2019 03:12
-
-
Save dannygoldstein/3f56f0ed3b9c01b8199f4032e9cf1206 to your computer and use it in GitHub Desktop.
hpss sort
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
#!/bin/bash | |
if [ $# -lt 1 ]; then | |
echo usage is "hsi_file_sorter.script <name_of_file_containing_HPSS_file_names>" | |
exit | |
fi | |
list=$1 | |
OUTDIR=$HOME | |
if [ ! -z "$SCRATCH" ]; then | |
OUTDIR=$SCRATCH | |
fi | |
temp_dir=$OUTDIR/hsi_sort_$RANDOM | |
#check that directory doesn't exist | |
while [ -d $temp_dir ]; do | |
temp_dir=$OUTDIR/hsi_sort_$RANDOM | |
done | |
mkdir $temp_dir | |
tempin=$temp_dir/tempin_$RANDOM.txt | |
cat $list | awk '{print "ls -P",$0}' > $tempin | |
/usr/common/mss/bin/hsi -q "in "$tempin 2>&1 | grep -E ^FILE | awk '$3 != 0 {printf("%s %s %s %s\n",substr($6,0,6),substr($5\ | |
,0,index($5,"+")-1),substr($5,index($5,"+")+1),$2)}' > $temp_dir/hsi_get.txt | |
#cat $temp_dir/hsi_get.txt | sort -n -k 1,2 -k 2,3 -k 3,4 | awk '{print $4}' | |
cat $temp_dir/hsi_get.txt | sort -k1,1 -k2,2n -k3,3n | awk '{print $0}' | |
#clean up | |
rm $temp_dir/hsi_get.txt | |
rm $tempin | |
rmdir $temp_dir | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import os | |
import psycopg2 | |
import pandas as pd | |
from argparse import ArgumentParser | |
import tempfile, io | |
from pathlib import Path | |
import subprocess | |
from sqlalchemy import create_engine | |
class HPSSDB(object): | |
def __init__(self): | |
dbname = os.getenv('HPSS_DBNAME') | |
password = os.getenv('HPSS_DBPASSWORD') | |
username = os.getenv('HPSS_DBUSERNAME') | |
port = os.getenv('HPSS_DBPORT') | |
host = os.getenv('HPSS_DBHOST') | |
self.engine = create_engine(f'postgresql://{username}:{password}@{host}:{port}/{dbname}') | |
def __del__(self): | |
del self.engine | |
def submit_hpss_job(tarfiles, images, job_script_destination, frame_destination, log_destination, tape_number): | |
nersc_account = os.getenv('NERSC_ACCOUNT') | |
if job_script_destination is None: | |
# then just use temporary files | |
jobscript = tempfile.NamedTemporaryFile() | |
subscript = tempfile.NamedTemporaryFile() | |
else: | |
jobscript = open(Path(job_script_destination) / f'hpss.{tape_number}.sh', 'w') | |
subscript = open(Path(job_script_destination) / f'hpss.{tape_number}.sub.sh', 'w') | |
substr = f'''#!/usr/bin/env bash | |
#module load esslurm | |
export PATH=/global/common/cori/software/hypnotoad:/opt/esslurm/bin:$PATH | |
export LD_LIBRARY_PATH=/opt/esslurm/lib64:$LD_LIBRARY_PATH | |
sbatch {Path(jobscript.name).resolve()} | |
''' | |
if job_script_destination is None: | |
substr = substr.encode('ASCII') | |
subscript.write(substr) | |
hpt = f'hpss.{tape_number}' | |
jobstr = f'''#!/usr/bin/env bash | |
#SBATCH -J {tape_number} | |
#SBATCH -L SCRATCH,project | |
#SBATCH -q xfer | |
#SBATCH -N 1 | |
#SBATCH -A {nersc_account} | |
#SBATCH -t 48:00:00 | |
#SBATCH -C haswell | |
#SBATCH -o {(Path(log_destination) / hpt).resolve()}.out | |
cd {Path(frame_destination).resolve()} | |
''' | |
for tarfile, imlist in zip(tarfiles, images): | |
wildimages = '\n'.join([f'*{p}' for p in imlist]) | |
directive = f''' | |
/usr/common/mss/bin/hsi get {tarfile} | |
echo "{wildimages}" | tar --strip-components=12 -i --wildcards --wildcards-match-slash --files-from=- -xvf {os.path.basename(tarfile)} | |
rm {os.path.basename(tarfile)} | |
''' | |
jobstr += directive | |
if job_script_destination is None: | |
jobstr = jobstr.encode('ASCII') | |
jobscript.write(jobstr) | |
jobscript.seek(0) | |
subscript.seek(0) | |
command = f'/bin/bash {Path(subscript.name).resolve()}' | |
p = subprocess.Popen(command.split(), stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
#stdout, stderr = p.communicate() | |
while True: | |
if p.poll() is not None: | |
break | |
else: | |
time.sleep(0.01) | |
stdout, stderr = p.stdout, p.stderr | |
retcode = p.returncode | |
out = stdout.readlines() | |
err = stderr.readlines() | |
print(out, flush=True) | |
print(err, flush=True) | |
if retcode != 0: | |
raise ValueError(f'Nonzero return code ({retcode}) on command, "{command}"') | |
jobscript.close() | |
subscript.close() | |
jobid = int(out[0].strip().split()[-1]) | |
return jobid | |
def retrieve_images(whereclause, exclude_masks=False, job_script_destination=None, frame_destination='.', | |
log_destination='.'): | |
# interface to HPSS and database | |
hpssdb = HPSSDB() | |
# this is the query to get the image paths | |
query = f'SELECT * FROM IMAGE WHERE HPSS_SCI_PATH IS NOT NULL AND {whereclause} AND SEEING < 3 AND MAGLIMIT > 18.5' | |
metatable = pd.read_sql(query, hpssdb.engine) | |
df = metatable[['path', 'hpss_sci_path', 'hpss_mask_path']] | |
dfsci = df[['path', 'hpss_sci_path']] | |
dfsci = dfsci.rename({'hpss_sci_path': 'tarpath'}, axis='columns') | |
if not exclude_masks: | |
dfmask = df[['path', 'hpss_mask_path']].copy() | |
dfmask.loc[:, 'path'] = [im.replace('sciimg', 'mskimg') for im in dfmask['path']] | |
dfmask = dfmask.rename({'hpss_mask_path': 'tarpath'}, axis='columns') | |
dfmask.dropna(inplace=True) | |
df = pd.concat((dfsci, dfmask)) | |
else: | |
df = dfsci | |
tars = df['tarpath'].unique() | |
# if nothing is found raise valueerror | |
if len(tars) == 0: | |
raise ValueError('No images match the given query') | |
instr = '\n'.join(tars.tolist()) | |
with tempfile.NamedTemporaryFile() as f: | |
f.write(f"{instr}\n".encode('ASCII')) | |
# rewind the file | |
f.seek(0) | |
# sort tarball retrieval by location on tape | |
sortexec = Path(os.getenv('LENSGRINDER_HOME')) / 'pipeline/bin/hpsssort.sh' | |
syscall = f'bash {sortexec} {f.name}' | |
p = subprocess.Popen(syscall.split(), stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
#stdout, stderr = p.communicate() | |
while True: | |
if p.poll() is not None: | |
break | |
else: | |
time.sleep(0.01) | |
retcode = p.returncode | |
stderr, stdout = p.stderr, p.stdout | |
if retcode != 0: | |
raise subprocess.CalledProcessError(stderr.read()) | |
# read it into pandas | |
ordered = pd.read_csv(stdout, delim_whitespace=True, names=['tape', 'position', '_', 'hpsspath']) | |
# submit the jobs based on which tape the tar files reside on | |
# and in what order they are on the tape | |
dependency_dict = {} | |
for tape, group in ordered.groupby('tape'): | |
# get the tarfiles | |
tarnames = group['hpsspath'].tolist() | |
images = [df[df['tarpath'] == tarname]['path'].tolist() for tarname in tarnames] | |
jobid = submit_hpss_job(tarnames, images, job_script_destination, frame_destination, log_destination, tape) | |
for image in df[[name in tarnames for name in df['tarpath']]]['path']: | |
dependency_dict[image] = jobid | |
del hpssdb | |
return dependency_dict, metatable | |
if __name__ == '__main__': | |
parser = ArgumentParser() | |
parser.add_argument("whereclause", default=None, type=str, | |
help='SQL where clause that tells the program which images to retrieve.') | |
parser.add_argument('--exclude-masks', default=False, action='store_true', | |
help='Only retrieve the science images.') | |
args = parser.parse_args() | |
retrieve_images(args.whereclause, exclude_masks=args.exclude_masks) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment