Skip to content

Instantly share code, notes, and snippets.

@dannygoldstein
Last active July 2, 2019 03:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dannygoldstein/3f56f0ed3b9c01b8199f4032e9cf1206 to your computer and use it in GitHub Desktop.
Save dannygoldstein/3f56f0ed3b9c01b8199f4032e9cf1206 to your computer and use it in GitHub Desktop.
hpss sort
#!/usr/bin/env bash
#!/bin/bash
if [ $# -lt 1 ]; then
echo usage is "hsi_file_sorter.script <name_of_file_containing_HPSS_file_names>"
exit
fi
list=$1
OUTDIR=$HOME
if [ ! -z "$SCRATCH" ]; then
OUTDIR=$SCRATCH
fi
temp_dir=$OUTDIR/hsi_sort_$RANDOM
#check that directory doesn't exist
while [ -d $temp_dir ]; do
temp_dir=$OUTDIR/hsi_sort_$RANDOM
done
mkdir $temp_dir
tempin=$temp_dir/tempin_$RANDOM.txt
cat $list | awk '{print "ls -P",$0}' > $tempin
/usr/common/mss/bin/hsi -q "in "$tempin 2>&1 | grep -E ^FILE | awk '$3 != 0 {printf("%s %s %s %s\n",substr($6,0,6),substr($5\
,0,index($5,"+")-1),substr($5,index($5,"+")+1),$2)}' > $temp_dir/hsi_get.txt
#cat $temp_dir/hsi_get.txt | sort -n -k 1,2 -k 2,3 -k 3,4 | awk '{print $4}'
cat $temp_dir/hsi_get.txt | sort -k1,1 -k2,2n -k3,3n | awk '{print $0}'
#clean up
rm $temp_dir/hsi_get.txt
rm $tempin
rmdir $temp_dir
import time
import os
import psycopg2
import pandas as pd
from argparse import ArgumentParser
import tempfile, io
from pathlib import Path
import subprocess
from sqlalchemy import create_engine
class HPSSDB(object):
def __init__(self):
dbname = os.getenv('HPSS_DBNAME')
password = os.getenv('HPSS_DBPASSWORD')
username = os.getenv('HPSS_DBUSERNAME')
port = os.getenv('HPSS_DBPORT')
host = os.getenv('HPSS_DBHOST')
self.engine = create_engine(f'postgresql://{username}:{password}@{host}:{port}/{dbname}')
def __del__(self):
del self.engine
def submit_hpss_job(tarfiles, images, job_script_destination, frame_destination, log_destination, tape_number):
nersc_account = os.getenv('NERSC_ACCOUNT')
if job_script_destination is None:
# then just use temporary files
jobscript = tempfile.NamedTemporaryFile()
subscript = tempfile.NamedTemporaryFile()
else:
jobscript = open(Path(job_script_destination) / f'hpss.{tape_number}.sh', 'w')
subscript = open(Path(job_script_destination) / f'hpss.{tape_number}.sub.sh', 'w')
substr = f'''#!/usr/bin/env bash
#module load esslurm
export PATH=/global/common/cori/software/hypnotoad:/opt/esslurm/bin:$PATH
export LD_LIBRARY_PATH=/opt/esslurm/lib64:$LD_LIBRARY_PATH
sbatch {Path(jobscript.name).resolve()}
'''
if job_script_destination is None:
substr = substr.encode('ASCII')
subscript.write(substr)
hpt = f'hpss.{tape_number}'
jobstr = f'''#!/usr/bin/env bash
#SBATCH -J {tape_number}
#SBATCH -L SCRATCH,project
#SBATCH -q xfer
#SBATCH -N 1
#SBATCH -A {nersc_account}
#SBATCH -t 48:00:00
#SBATCH -C haswell
#SBATCH -o {(Path(log_destination) / hpt).resolve()}.out
cd {Path(frame_destination).resolve()}
'''
for tarfile, imlist in zip(tarfiles, images):
wildimages = '\n'.join([f'*{p}' for p in imlist])
directive = f'''
/usr/common/mss/bin/hsi get {tarfile}
echo "{wildimages}" | tar --strip-components=12 -i --wildcards --wildcards-match-slash --files-from=- -xvf {os.path.basename(tarfile)}
rm {os.path.basename(tarfile)}
'''
jobstr += directive
if job_script_destination is None:
jobstr = jobstr.encode('ASCII')
jobscript.write(jobstr)
jobscript.seek(0)
subscript.seek(0)
command = f'/bin/bash {Path(subscript.name).resolve()}'
p = subprocess.Popen(command.split(), stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
#stdout, stderr = p.communicate()
while True:
if p.poll() is not None:
break
else:
time.sleep(0.01)
stdout, stderr = p.stdout, p.stderr
retcode = p.returncode
out = stdout.readlines()
err = stderr.readlines()
print(out, flush=True)
print(err, flush=True)
if retcode != 0:
raise ValueError(f'Nonzero return code ({retcode}) on command, "{command}"')
jobscript.close()
subscript.close()
jobid = int(out[0].strip().split()[-1])
return jobid
def retrieve_images(whereclause, exclude_masks=False, job_script_destination=None, frame_destination='.',
log_destination='.'):
# interface to HPSS and database
hpssdb = HPSSDB()
# this is the query to get the image paths
query = f'SELECT * FROM IMAGE WHERE HPSS_SCI_PATH IS NOT NULL AND {whereclause} AND SEEING < 3 AND MAGLIMIT > 18.5'
metatable = pd.read_sql(query, hpssdb.engine)
df = metatable[['path', 'hpss_sci_path', 'hpss_mask_path']]
dfsci = df[['path', 'hpss_sci_path']]
dfsci = dfsci.rename({'hpss_sci_path': 'tarpath'}, axis='columns')
if not exclude_masks:
dfmask = df[['path', 'hpss_mask_path']].copy()
dfmask.loc[:, 'path'] = [im.replace('sciimg', 'mskimg') for im in dfmask['path']]
dfmask = dfmask.rename({'hpss_mask_path': 'tarpath'}, axis='columns')
dfmask.dropna(inplace=True)
df = pd.concat((dfsci, dfmask))
else:
df = dfsci
tars = df['tarpath'].unique()
# if nothing is found raise valueerror
if len(tars) == 0:
raise ValueError('No images match the given query')
instr = '\n'.join(tars.tolist())
with tempfile.NamedTemporaryFile() as f:
f.write(f"{instr}\n".encode('ASCII'))
# rewind the file
f.seek(0)
# sort tarball retrieval by location on tape
sortexec = Path(os.getenv('LENSGRINDER_HOME')) / 'pipeline/bin/hpsssort.sh'
syscall = f'bash {sortexec} {f.name}'
p = subprocess.Popen(syscall.split(), stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
#stdout, stderr = p.communicate()
while True:
if p.poll() is not None:
break
else:
time.sleep(0.01)
retcode = p.returncode
stderr, stdout = p.stderr, p.stdout
if retcode != 0:
raise subprocess.CalledProcessError(stderr.read())
# read it into pandas
ordered = pd.read_csv(stdout, delim_whitespace=True, names=['tape', 'position', '_', 'hpsspath'])
# submit the jobs based on which tape the tar files reside on
# and in what order they are on the tape
dependency_dict = {}
for tape, group in ordered.groupby('tape'):
# get the tarfiles
tarnames = group['hpsspath'].tolist()
images = [df[df['tarpath'] == tarname]['path'].tolist() for tarname in tarnames]
jobid = submit_hpss_job(tarnames, images, job_script_destination, frame_destination, log_destination, tape)
for image in df[[name in tarnames for name in df['tarpath']]]['path']:
dependency_dict[image] = jobid
del hpssdb
return dependency_dict, metatable
if __name__ == '__main__':
parser = ArgumentParser()
parser.add_argument("whereclause", default=None, type=str,
help='SQL where clause that tells the program which images to retrieve.')
parser.add_argument('--exclude-masks', default=False, action='store_true',
help='Only retrieve the science images.')
args = parser.parse_args()
retrieve_images(args.whereclause, exclude_masks=args.exclude_masks)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment