Skip to content

Instantly share code, notes, and snippets.

@theideasmith
Last active June 13, 2017 17:15
Show Gist options
  • Save theideasmith/864ca7478f7813bbef8b1116481953c4 to your computer and use it in GitHub Desktop.
Save theideasmith/864ca7478f7813bbef8b1116481953c4 to your computer and use it in GitHub Desktop.
ANN Training Management
import os
from utilities import mkdir_p
class Constants:
def __init__(self):
self.YETI_HOME = "/u/3/a/acl2205"
self.HABANERO_HOME = "/rigel/home/acl2205"
self.GROUPNAME = "stats"
self.NATIVE_HOME="/Users/akivalipshitz"
remote_homes = {self.YETI_HOME:"yeti", self.HABANERO_HOME:"habanero"}
local_homes = {self.NATIVE_HOME:"native"}
def _is_remote():
if os.environ["HOME"] in remote_homes:
return True
elif os.environ["HOME"] in local_homes:
return False
else:
return None
self.IS_REMOTE = _is_remote()
storage_locations = {
"yeti": "/vega/stats/users/acl2205/",
# "native":"/Users/akivalipshitz/Developer/linderman/",
"native": "/Volumes/AkivaDrive/linderman/",
"habanero":"/rigel/stats/users/acl2205/"
# "native":"/Volumes/AkivaDrive"
}
def _storage_location():
self.lookup = remote_homes if self.IS_REMOTE else local_homes
self.name = self.lookup[os.environ["HOME"]]
location = storage_locations[self.name]
return location
self.MAIN_STORAGE_DIR = _storage_location()
self.DATA_STORAGE = os.path.join(self.MAIN_STORAGE_DIR, "data/")
self.PROJDIR = os.path.join(os.environ["HOME"], "wormdynamics")
# Data relevant stuff
# Where skeletonx and skeletony are stored
self.NPYFILES = os.path.join(self.DATA_STORAGE, "npyfiles")
# Where the raw matfiles are stored
self.RAWFILES = os.path.join(self.DATA_STORAGE, "raw")
# Where stateseqs are stored
self.SEQFILES = os.path.join(self.DATA_STORAGE, "seqfiles")
self.point_to_local()
self.REMOTE_STORAGE = "/Users/akivalipshitz/Developer/linderman/data/remotedata/data"
self.REMOTE_MODELDB = os.path.join(self.REMOTE_STORAGE, "modeldb.json")
self.REMOTE_TRAINDB = os.path.join(self.REMOTE_STORAGE, "traindb.json")
self.REMOTE_TRAINFILES = os.path.join(self.REMOTE_STORAGE, "trainfiles")
self.REMOTE_MODELDIR = os.path.join(self.REMOTE_STORAGE, "modeldir")
mkdir_p(self.MODELDIR)
self.STD_FRAMERATE = 30.0
self.IS_JOB = "IS_RUNNING"
self.NOT_JOB = "NOT_RUNNING"
self.QSUB_ENV_VAR = "RUNNING_QSUB"
self.PBS_JOBID = "PBS_JOBID"
self.PBS_ARRAYNUMBER = "PBS_ARRAYID"
self.SLURM_ENV_VAR = "RUNNING_QSUB"
self.SBATCH_JOBID = "SLURM_ARRAY_JOB_ID"
self.SBATCH_ARRAYNUMBER = "SLURM_ARRAY_TASK_ID"
self.env_var = {
"habanero": self.SLURM_ENV_VAR,
"yeti": self.QSUB_ENV_VAR,
"native":""
}
self.jobid = {
"habanero": self.SBATCH_JOBID,
"yeti": self.PBS_JOBID,
"native": ""
}
self.arraynumber = {
"habanero": self.SBATCH_ARRAYNUMBER,
"yeti": self.PBS_ARRAYNUMBER,
"native": ""
}
self.ARRAYNUMBER = self.arraynumber[self.name]
self.ENV_VAR = self.env_var[self.name]
self.JOBID= self.jobid[self.name]
def point_to_local(self):
self.MODELDB = os.path.join(self.DATA_STORAGE, "modeldb.json")
self.TRAINDB = os.path.join(self.DATA_STORAGE, "traindb.json")
self.TRAINFILES = os.path.join(self.DATA_STORAGE, "trainfiles")
self.MODELDIR = os.path.join(self.DATA_STORAGE, "modeldir")
def point_to_remote(self):
self.MODELDB = self.REMOTE_MODELDB
self.TRAINDB = self.REMOTE_TRAINDB
self.TRAINFILES = self.REMOTE_TRAINFILES
self.MODELDIR = self.REMOTE_MODELDIR
const = Constants()
# -*- coding: UTF-8 -*-
import json
import time
from tinydb import TinyDB, Query, where
import os
from const import const
import hashlib
from utilities import mkdir_p
import shutil
def checkEqual2(iterator):
return len(set(iterator)) <= 1
def existsall(files):
return all(map(os.path.exists, files))
def default(var, to):
if not var: return to
return var
def remove(path):
""" param <path> could either be relative or absolute. """
if os.path.isfile(path):
os.remove(path) # remove the file
elif os.path.isdir(path):
shutil.rmtree(path) # remove dir and all contains
else:
raise ValueError("file {} is not a file or dir.".format(path))
def gen_rndid(n):
modelid = ".".join(map(str,map(ord, os.urandom(n))))
hashed = hashlib.md5()
hashed.update(modelid)
return hashed.hexdigest()
def dict_to_json(n):
return json.dumps(n, ensure_ascii=False)
class ModelDB:
def __init__(self, location):
if not os.path.exists(os.path.dirname(location)):
raise TypeError("Specified file aint exist " + location)
self.location = location
self.db = TinyDB(location)
def get_models(self, modelname):
model = Query()
return self.db.search(model.name == modelname)
def get_model(self, modelname):
avail = self.get_models(modelname)
assert len(avail)==1
return avail[0]
def get_by_id(self, modelid):
model = Query()
matches = self.db.search(model.modelid == modelid)
if len(matches) == 1:
return matches[0]
def model_exists(self, modelname):
return len(self.get_models(modelname)) > 0
def add_model(self, modelname, description="", modeldir=""):
assert modelname != None
hashed = gen_rndid(10)
if self.model_exists(modelname):
existing = self.get_model(modelname)
hashed = existing["modelid"]
self.db.remove(eids=[existing.eid])
mkdir_p( modeldir )
row = {
"name": modelname,
"description": description,
"modelid": hashed,
"modelloc":os.path.join(modeldir, hashed+"_"+modelname)
}
self.db.insert(row)
return hashed
class TrainDB:
def __init__(self, modeldb, location, autoupdate=False):
if not os.path.exists(os.path.dirname(location)):
raise InputError("Specified file aint exist")
self.location = location
self.modeldb = modeldb
self.db = TinyDB(location)
if autoupdate:
self.update_db()
def update_db(self):
"""
Updates the database if trains have been deleted
"""
for train in self.db.all():
if not os.path.isdir(train["outputdir"]):
self.remove(train["trainid"])
def getTrains(self, trainid):
train = Query()
return self.db.search(train.trainid == trainid)
def runExists(self, trainid):
return len(self.getTrains(trainid)) == 1
def load_summary(self, summary):
return json.loads(summary)
def getTrain(self, trainid):
assert self.runExists(trainid)
train= self.getTrains(trainid)[0]
return train
def remove(self, trainid):
train = self.db.search(where("trainid")==trainid)[0]
if not train: return
if os.path.isdir(train["outputdir"]):
out = train["outputdir"]
files = os.listdir(out)
files = filter(lambda f: not f in train["ignore"], files)
files = map(lambda f: os.path.join(out, f), files)
map(remove, files)
self.remove_train(trainid)
def remove_train(self, trainid):
train = self.db.search(where("trainid")==trainid)[0]
if not train: return
eid = train.eid
self.db.remove(eids=[eid])
def reset_files(self, trainid):
def update(train):
datafiles=train["datafiles"]
train["datafiles"] = {fname: False for fname in datafiles}
return train
self.db.update(update, where("trainid")==trainid)
def clear(self):
eids = map(lambda x: x.eid, self.db.all())
self.db.remove(eids=eids)
def trainrun(
self,
genfile,
desc=None,
modelname=None,
datafiles=[],
outputdir=None,
date=None,
configuration=None,
inneroutputdirs=[],
ignore=[],
forceremove=False):
assert modelname
assert existsall(datafiles)
date = default(date, time.asctime())
configuration = default(configuration, {})
assert desc
existing = self.db.search(where("genfile")==genfile)
if len(existing) > 0:
trainid = existing[0]["trainid"]
if forceremove:
self.remove(trainid)
else:
self.remove_train(trainid)
else:
trainid = gen_rndid(10)
outputdir = os.path.join(outputdir, os.path.basename(genfile))
rundata = {
"desc": desc,
"modelname":modelname,
"datafiles": {fname: False for fname in datafiles},
"date": date,
"outputdir": outputdir,
"trainid": trainid,
"configuration": configuration,
"summary":"",
"genfile": genfile,
"ignore": ignore
}
self.db.insert(rundata)
# Generating output directories
# within the directory for this training run
is_arraytrain= False
if "arrayjob" in configuration:
is_arraytrain = configuration["arrayjob"]
if is_arraytrain:
for i, datafile in enumerate(datafiles):
arraydir = os.path.join(
outputdir,
"job_{}".format(i))
for d in inneroutputdirs:
newdir = os.path.join(arraydir, d)
mkdir_p(newdir)
else:
mkdir_p(outputdir)
map(lambda d:
self.makedatadir(trainid, d),
inneroutputdirs)
return trainid
def usedfile(self, trainid, fname):
def update(train):
assert fname in train["datafiles"]
train["datafiles"][fname] = True
return train
self.db.update(update, where("trainid") == trainid)
def logtrain(self, trainid, summary_results, used_fnames=[]):
"""
1. What if you do a training run which fails
How do you let the trainer know that this has happened?
2. Are we supposed to crash if you have already summarized this train
"""
serialized = dict_to_json(summary_results)
train = self.getTrain(trainid)
assert self.runExists(trainid)
assert set(used_fnames).issubset(set(train["datafiles"].keys()))
self.db.update({'summary': summary_results}, Query().trainid == trainid)
map(lambda f: self.usedfile(trainid, f), used_fnames)
def makedatadir(self, trainid, dirname):
"""
Makes a datafolder within the outputdir
of the training run with the given trainid
"""
train = self.getTrain(trainid)
outputdir = train["outputdir"]
newdir = os.path.join(outputdir, dirname)
mkdir_p(newdir)
def outputdir(trainid):
return self.getTrain(trainid)["outputdir"]
def trained_on_all_files(self, trainid):
train = self.getTrain(trainid)
availdatasets = filter(
lambda k: train["datafiles"][k] == False,
train["datafiles"].keys()
)
return len(availdatasets) == 0
def getdbs():
modeldb = ModelDB(location=const.MODELDB)
traindb = TrainDB(modeldb, location=const.TRAINDB)
return modeldb, traindb
modeldb, traindb = getdbs()
from utilities import mkdir_p
from modeldb import ModelDB, TrainDB
from const import const
import os
import loaddata as load
import numpy as np
import itertools as it
from pyhsmm.util.text import progprint_xrange
import pickle
"""
Trains represent tests on parameters, models are concrete entitiets
Philosophy:
Separation of model and training – models are different than their parameters
"""
class Model:
"""
This layer enables additional functionality to
be implemented above barebones arhsmm instances
by subclassing the canonical Model, whose implementation
is defined below
"""
def __init__(self, armodel, pcas = 3):
self.armodel = armodel
self.pcas = pcas
def add_data(self, data, seq=None):
if seq is not None:
self.armodel.add_data(data, stateseq = seq, fixed_stateseq=True, initialize_from_prior=True)
else:
self.armodel.add_data(data)
def iterate(self):
self.armodel.resample_model()
def fill_with(self, datas, stateseqs=None):
if stateseqs:
for data, stateseq in zip(datas, stateseqs):
self.armodel.add_data(data.T, stateseq=stateseq, fixed_stateseq=True, initialize_from_prior=True)
return
for d in datas:
self.armodel.add_data(d.T)
def fill_model(self, datasetloc, semisupervised=False):
assert os.path.exists(datasetloc)
data, _ = load.loaddata(datasetloc, n=self.pcas)
stateseq = None
if semisupervised:
seqloc = datasetloc.replace("npyfiles", "seqfiles")
stateseq = np.load(seqloc)
self.armodel.add_data(data.T, stateseq = stateseq)
def predict(self, data, nahead):
return self.armodel.predict(data, nahead)
def log_likelihood(self):
return self.armodel.log_likelihood()
def iterations(self, iters, writeto=None):
for i in progprint_xrange(iters):
self.iterate()
# It is expensive to write to disk
yield self.log_likelihood()
def write_model(self, fname):
mkdir_p(
os.path.dirname(fname)
)
assert os.path.exists(os.path.dirname(fname))
replica = Model(self.armodel.copy_sample(), pcas=self.pcas)
with open(fname, 'w') as modelf:
pickle.dump(replica, modelf)
modelf.close()
@staticmethod
def load_model(fname):
if not os.path.exists(fname):
return None
with open(fname, 'r') as modelf:
return pickle.load(modelf)
# CALL THIS FUNCTION TO REGISTER A MODEL.
# model is a Model instance
def commit(model, name, desc, modeldir):
modeldb = ModelDB(location=const.MODELDB)
model_id = modeldb.add_model(
name,
description=desc,
modeldir=modeldir
)
model_loc = modeldb.get_by_id(model_id)["modelloc"]
model_inst = Model(model.armodel)
model_inst.write_model(model_loc)
return model_id
import re
import sys
"""
Introduction:
This is a micro argparse for turning any
python function into a bash script.
For scientists who are busy and need
to throw a CLI together in 3 seconds
Handles strings and integers
as kwargs and args
python functions can be scriptified and
accessed from the command line
or used directly from python
Usage:
./<script name>.py arg1 arg2 ... argn kwarg1=k1 kwarg2=k2 kwarg3=k3
Parses floats and ints into their respective type
To mimic boolean kwargs, set the default to None and pass k=1 on
the command line to flip on the boolean
eg:
--- script.py ---
def f(flag=None):
if flag:
print flag
else:
print "ISNONE"
scriptify(f)
---- bash --------
$: ./script.py flag=1
#=> 1
$: ./script.py
#=> ISNONE
Example usage:
------- echo.py -------
#!/usr/bin/env python
from scriptify import scriptify
def echo(string, a=1):
print string
print a
if __name__=="__main__":
scriptify(string)
------ bash ------------
chmod +x echo.py
$: ./echo.py "Hello World" a="To You"
#=> Hello World
#=> "To You"
"""
#Matches dictionary definitions of integer or string values
# Match key String Values Numerical Values Exponents
# kwargs_words = "([A-z]+[A-z0-9_]*)\ *\=(?:\"|\')(.+)(?:\"|\')"
kwargs_words = "([A-z]+[A-z0-9_]*)\ *\=(.+)"
kwargs_numbers = "([A-z]+[A-z0-9_]*)\ *\=(\-*[0-9]*\.*[0-9]+(?:e\-*\d+)*)"
re_kwargs_words = re.compile(kwargs_words)
re_kwargs_numbers = re.compile(kwargs_numbers)
args= "(\-*[0-9]*\.*[0-9]+)(e\-*\d+)|(.+)"
re_args = re.compile(args)
def numread(string):
try:
return int(string)
except ValueError:
try:
return float(string)
except ValueError:
return string
def matchany_or_last(regex, string):
match_first = regex +"\ *\,\ *"
match_any = regex
match_last = "\ *\,_\ *" + regex
re_match_first = re.compile(match_first)
re_match_any = re.compile(match_any)
re_match_last = re.compile(match_last)
firsts = re_match_first.findall(string)
string = re_match_first.sub('', string)
anys = re_match_any.findall(string)
string = re_match_any.sub('', string)
lasts = re_match_last.findall(string)
string = re_match_last.sub('', string)
total = []
total.extend(firsts)
total.extend(anys)
total.extend(lasts)
return total, string
def _collectkwargs(argv):
words, string = matchany_or_last(kwargs_words, argv)
numbers, _ = matchany_or_last(kwargs_numbers, string)
total = []
total.extend(words)
total.extend(numbers)
ret = dict(total)
for k in ret.keys():
ret[k] = numread(ret[k])
return ret, string
def collectkwargs(argv):
m, _ = _collectkwargs(argv)
return m
def collectargs(argv):
total = re_args.findall(argv)
total = map(lambda x: numread(''.join(list(x))), total)
return total
def parseargs(argc):
kwargs, string = _collectkwargs(argc)
args = collectargs(string)
return kwargs, args
def scriptify(f, string=''):
if string=='' and len(sys.argv) >= 1:
string = sys.argv[1:]
else:
return InputError("No string passed")
results=map(parseargs, string)
kwargs = {}
args = []
for kwarg, arg in results:
kwargs.update(kwarg)
args.extend(arg)
f(*args, **kwargs)
import sys
import os
from functify import functify as functify
from modeldb import modeldb, traindb
from tinydb import where
from const import const
from utilities import mkdir_p
def p(*args):
print args
class FileGenerator(object):
def __init__(self, fname=""):
self.lines = []
self.suffix = ""
self.fname = fname
def add_line(self, *args):
line = args
line = map(str, line)
self.lines.append(' '.join(line))
return self
def suffix(self, suffix):
self.suffix = suffix
def prefix(self, prefix):
self.prefix = prefix
def joinlines(self):
return '\n'.join(self.lines)
def write(self):
map(p, self.lines)
with open(self.fname, 'w') as f:
string = self.joinlines()
f.write(string)
class ClusterSubmit(FileGenerator):
def __init__(self, *args,**kwargs):
prefix_map={
"std_prefix": "",
"jobname": "",
"groupname": "",
"mem": "",
"outputdir": "",
"array": "",
"time": "",
"nodes": "",
"processors": "",
"email":""
}
self.prefmap = prefix_map
super(ClusterSubmit, self).__init__(*args, **kwargs)
def initline(self, *args):
args = list(args)
self.add_line(*([self.prefmap["std_prefix"]]+args))
return self
def array(self, narray):
narray = str(narray)
self.initline(
self.prefmap["array"]+narray)
def jobname(self, name):
name = str(name)
self.initline(
self.prefmap["jobname"]+name)
return self
def groupname(self, name):
name = str(name)
self.initline(
self.prefmap["groupname"]+name)
return self
def time(self, time):
time = str(time)
self.initline(
self.prefmap["time"]+time)
return self
def mem(self, mem):
mem = str(mem)
self.initline(
self.prefmap["mem"]+mem)
return self
def out(self, to):
to = str(to)
self.initline(
self.prefmap["outputdir"]+to)
return self
def hardware(self, nodes=1, processors=1):
nodes = str(nodes)
processors = str(processors)
self.initline(
self.prefmap["nodes"]+nodes)
self.initline(
self.prefmap["processors"]+processors)
return self
def bash(self, *bashline):
self.add_line(*bashline)
return self
def email(self, email):
self.initline(
self.prefmap["email"]+email)
return self
class YetiSubmit(ClusterSubmit):
def __init__(self, *args, **kwargs):
super(YetiSubmit, self).__init__(*args, **kwargs)
self.prefmap={
"std_prefix": "#PBS",
"jobname": "-N=",
"groupname": "-W group_list=",
"mem": "-l mem=",
"outputdir": "-o localhost:",
"array": "-t 1-",
"time": "-l walltime=",
"nodes": "-l nodes=",
"processors": "ppn=",
"email":"-M"
}
self.envexport()
def email_codes(self, abort=False, begin=False, end=False):
mapping = {
"abort": "a",
"begin": "b",
"end": "e"
}
abe_string = ""
if abort: abe_string += mapping["abort"]
if begin: abe_string += mapping["begin"]
if end: abe_string += mapping["end"]
self.add_line("#PBS -m {}".format(abe_string))
return self
def envexport(self):
self.add_line("#PBS -V")
return self
def out(self, stdout_file):
self.add_line("#PBS -o localhost:{}".format(stdout_file))
self.add_line("#PBS -e localhost:{}".format(stderr_file))
return self
class HabaneroSubmit(ClusterSubmit):
def __init__(self, *args, **kwargs):
super(HabaneroSubmit, self).__init__(*args, **kwargs)
self.prefmap ={
"std_prefix":"#SBATCH",
"time":"--time=",
"nodes":"--nodes=",
"processors":"--ntasks-per-node=",
"mem":"--mem=",
"groupname":"--account=",
"outputdir":"--output=",
"email":"--mail-user=",
"jobname":"--job-name=",
"array":"--array=1-"
}
def envexport(self):
self.initline("--export=ALL")
return self
def email(self, email):
self.initline(
self.prefmap["email"]+email)
self.initline("--mail-type=END")
return self
strify = lambda x: "\"" + x + "\""
from sh import bash, python, rm
def trainkit_submit(
pyscript,
analysis=False,
ffmpeg=False,
train=True,
mem=1000,
walltime="01:00:00",
nodes=1,
processors=1,
array=1,
run=False,
loc=const.name):
pyscript = os.path.abspath(pyscript)
trains = traindb.db.search(where("genfile") == pyscript)
if len(trains) == 0:
print "notrains"
python(pyscript)
train = trains[0]
outputdir = train["outputdir"]
scriptpath = os.path.abspath(pyscript)
jobname = os.path.basename(scriptpath).split('.')[0]
fname = os.path.join(train["outputdir"],
"qsubfiles/", jobname + "_submit.sh")
mkdir_p(os.path.dirname(fname))
job_outdir = os.path.join(outputdir, "out")
mkdir_p(job_outdir)
prev_files = map(lambda f: os.path.join(job_outdir, f), os.listdir(job_outdir))
map(os.remove, prev_files)
if loc == "yeti":
submission = YetiSubmit(fname)
groupname="yetistats"
elif loc=="habanero":
submission = HabaneroSubmit(fname)
groupname="stats"
elif loc=="native":
groupname="lipshitz"
submission = ClusterSubmit(fname)
if ffmpeg:
processors = 24
else:
processors = 12
submission\
.bash("#!/bin/sh")\
.bash("#Config")\
.email("acl2205@columbia.edu")\
.jobname(jobname)\
.hardware(nodes=nodes, processors=processors)\
.time(walltime)\
.groupname(groupname)\
.mem(mem)\
isarr=False
if array > 1:
if not "narray" in train["configuration"]:
isarr=True
submission\
.array(array)
elif "narray" in train["configuration"]:
armax = train["configuration"]["narray"]
submission.array(armax)
isarr=True
if isarr:
submission.out(
os.path.join(job_outdir,"job-%A-%a.out")
)
else:
submission.out(job_outdir)
submission\
.envexport()\
.bash("#Running")\
.bash("mkdir", "-p", strify(job_outdir))\
.bash("export ", const.ENV_VAR + "=" + const.IS_JOB)\
.bash("source activate wormdynamics")\
if train:
submission\
.bash("python ", pyscript)\
if not ffmpeg and analysis:
submission\
.bash("python",
os.path.join(const.PROJDIR, "analysis_scripts/summary.py"),
"fname="+pyscript,
"parallelize=1 deleteprev=1"
)
elif analysis:
submission\
.bash("python",
os.path.join(const.PROJDIR, "analysis_scripts/summary.py"),
"fname="+pyscript,
"parallelize=1 deleteprev=1",
"ffmpeg_videos=1")\
submission\
.bash("export ", const.ENV_VAR + "=" + const.NOT_JOB)\
.write()
return fname
if __name__ == "__main__":
functify(trainkit_submit)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment