Skip to content

Instantly share code, notes, and snippets.

@philerooski
Created April 22, 2020 20:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save philerooski/ce255e65b47cfd3df653c1559330141d to your computer and use it in GitHub Desktop.
Save philerooski/ce255e65b47cfd3df653c1559330141d to your computer and use it in GitHub Desktop.
import synapseclient
import argparse
import pandas
import json
import os
DUMMY_ARG = "dummy" # A 'None' like string we can pass to boto
def read_args():
parser = argparse.ArgumentParser(
description="Combine partitions of a dataset and store to Synapse. "
"This script is to be run on a machine where a "
"volume containing the partitioned datasets "
"is mounted."
"\n\n"
"The directory stucture is expected to consist of "
"one or more uniquely named directories within "
"the `inputPath` directory. Within each of these "
"directories are expected to be tabular files, each "
"with an identical schema."
"\n\n"
"For example, if we had this directory structure "
"within our root folder:"
"\n\n"
"mPower_efs\n"
"|-- handToNoseLeft\n"
"| |-- file1.csv\n"
"| |-- file2.csv\n"
"|-- handToNoseRight\n"
" |-- file1.csv\n"
" |-- file2.csv\n"
"\n"
"Then this call: `python reduce_features.py "
"--assay handToNoseLeft --activity tremor --output "
"syn1234567 --file-format csv --input-path /mPower_efs` "
"\n\n"
"would store the file "
"'tremorFeatures_handToNoseLeft.csv' "
"in syn1234567.",
formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument("--input-path", required=True,
help="Required. Location of files to be concatenated.")
parser.add_argument("--assay", required=True,
help="Required. Name of the assay. This should also \n"
"be the name of the subdirectory within \n"
"`input-path` to concatenate the feature files \n"
"within.")
parser.add_argument("--file-format", required=True,
help="The format of both the feature file being "
"updated and/or the expected output. Accepted "
"arguments include 'tsv', 'csv', and others. "
"If the argument isn't recognized, it will be "
"interpreted as the literal delimiter to use. ")
parser.add_argument("--reduce-job-info", required=True,
help="Synapse ID of reduce job info file.")
parser.add_argument("--output",
default=DUMMY_ARG,
help="Synapse ID of the parent entity where \n"
"concatenated features will be stored. Ignored \n"
"when --update is set. Not setting this flag will \n"
"write the features to the current working \n"
"directory but not upload them to Synapse.")
parser.add_argument("--activity",
help="Name of the activity (tremor, tapping, ...). \n"
"Ignored when --update is set. Required otherwise.")
parser.add_argument("--update",
default=DUMMY_ARG,
help="The Synapse ID of a pre-existing feature file \n"
"on Synapse to update. Including this argument \n"
"will append the concatenated feature files to \n"
"the end of the file already on Synapse.")
parser.add_argument("--job-table", default=None,
help="Synapse ID of the job table used during the \n"
"job submission process. This will be deleted.")
args = parser.parse_args()
return args
def get_job_info(syn, reduce_job_info):
f = syn.get(reduce_job_info)
with open(f.path, "r") as job_file:
job_info = json.loads(job_file.read())
return job_info
def curate_files(loc, assay):
parent_directory = os.path.join(loc, assay)
features = [os.path.join(parent_directory, p)
for p in os.listdir(parent_directory)]
return features
def get_sep(file_format):
if file_format == "csv":
return ","
elif file_format == "tsv":
return "\t"
else:
return file_format
def combine_files(paths, file_format):
extension = os.path.splitext(paths[0])[1]
dataframes = [pandas.read_csv(p, sep=get_sep(file_format), header=0,
index_col=False) for p in paths]
features = pandas.concat(dataframes, ignore_index=True)
return features, extension
def write_features(features, activity, assay, file_format, extension):
fname = "{}Features_{}{}".format(activity, assay, extension)
features.to_csv(fname, sep=get_sep(file_format), index=False, header=True)
return fname
def upload(syn, fname, output, job_info):
f = synapseclient.File(
path=fname,
name=fname,
parent=output)
syn.store(f, used=job_info['used'], executed=job_info['executed'])
def update(syn, features, synapse_features, file_format, assay, job_info):
f = syn.get(synapse_features)
sep = get_sep(file_format)
old_features = pandas.read_csv(f.path, sep=sep,
index_col=False, header=0)
new_features = pandas.concat([old_features, features], ignore_index=True)
if job_info['verify_integrity']:
# if verify_columns was not set, entire row must match exactly.
new_features = new_features.drop_duplicates(
subset=job_info['verify_columns'],
keep='last')
new_features.to_csv(f.path, sep=sep, index=False, header=True)
syn.store(f, used=job_info['used'], executed=job_info['executed'])
def cleanup(syn, job_table, reduce_job_info, input_path, assay):
# Delete job table and reduce job info
syn.delete(job_table)
syn.delete(reduce_job_info)
# Delete original, partitioned features files
os.system("rm -rf {}".format(os.path.join(input_path, assay)))
def main():
args = read_args()
syn = synapseclient.login(os.environ['synapseUsername'],
os.environ['synapsePassword'])
job_info = get_job_info(syn, args.reduce_job_info)
feature_files = curate_files(args.input_path, args.assay)
if not len(feature_files):
raise RuntimeError("The directory in which the features "
"were expected to be {} is empty.".format(
os.path.join(args.input_path, args.assay)))
features, extension = combine_files(feature_files, args.file_format)
try:
if args.update != DUMMY_ARG:
update(syn, features, args.update, args.file_format,
args.assay, job_info)
elif args.output != DUMMY_ARG:
fname = write_features(features, args.activity, args.assay,
args.file_format, extension)
upload(syn, fname, args.output, job_info)
cleanup(syn, args.job_table, args.reduce_job_info,
args.input_path, args.assay)
except Exception as e:
raise e
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment