Skip to content

Instantly share code, notes, and snippets.

@philerooski
Created April 22, 2020 20:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save philerooski/3e370dba84b55843d651a064dbea165b to your computer and use it in GitHub Desktop.
Save philerooski/3e370dba84b55843d651a064dbea165b to your computer and use it in GitHub Desktop.
import synapseclient
import pandas
import argparse
import boto3
import os
import json
import uuid
AWS_BATCH_ARRAY_SIZE_LIMIT = 10000
USED = set(["https://github.com/Sage-Bionetworks/mPowerAnalysis/blob/master/featureExtraction/tremorModule.R", "https://hub.docker.com/r/philsnyder/mpower-feature-extraction/"])
EXECUTED = set(['https://github.com/Sage-Bionetworks/mPowerAnalysis/blob/master/featureExtraction/batch/submit_jobs.py'])
DUMMY_ARG = "dummy" # A 'None' like string we can pass to boto
def read_args():
parser = argparse.ArgumentParser(
description="Submit jobs to AWS Batch to extract features "
"from raw data for a single assay. All flagged "
"arguments are necessary unless marked otherwise.")
parser.add_argument("--input-table",
required=True,
help="Synapse ID of a table containing a recordId "
"column and raw features. All record IDs in "
"the table will be submitted for processing if "
"the update flag is not specified.")
parser.add_argument("--output-path",
required=True,
help="Local or shared filepath to write the "
"extracted features to.")
parser.add_argument("--job-info-parent",
required=True,
help="Project on Synapse to store the created job "
"table and reduce job file within.")
parser.add_argument("--assay-name",
required=True,
help="Name of assay to be processed.")
parser.add_argument("--assay-column",
required=True,
help="Name of column in input-table containing "
"the raw data for assay-name.")
parser.add_argument("--activity-name",
default=DUMMY_ARG,
help="Name of the activity (tremor, tapping, ...). "
"Ignored when --update is set. Required otherwise.")
parser.add_argument("--chunk-size",
default=30,
type=int,
help="Number of record IDs to process per job.")
parser.add_argument("--output-synapse",
default=DUMMY_ARG,
help="Synapse ID of the parent entity where"
"concatenated features will be stored. Ignored"
"when --update is set.")
parser.add_argument("--update",
default=DUMMY_ARG,
help="The Synapse ID of a feature file "
"containing a recordId column. Any preexisting "
"record ID values will not be submitted for "
"feature extraction. Currently this requires "
"downloading the entire feature file.")
parser.add_argument("--file-format",
default="tsv",
help="The format of both the feature file being "
"updated and/or the expected output. Recognized "
"arguments are 'tsv' and 'csv'. "
"If the argument isn't recognized, it will be "
"interpreted as the literal delimiter to use. "
"Unfortunately, this is very sensitive to "
"formatting when it comes to special characters "
"(like \\t) because of the need to pass this "
"special character through multiple interpreters "
"(e.g., bash, python). So we highly recommend "
"using a more common format, such as 'csv' "
"or 'tsv'.")
parser.add_argument("--used", nargs='+', help="For provenance.")
parser.add_argument("--executed", nargs='+', help="For provenance.")
parser.add_argument("--verify-integrity", action="store_const", const=True,
help="Drop rows that are already included in the "
"Synapse feature file when --update is specified. "
"By default, it checks for exact duplicates, "
"which may be problematic when dealing with "
"floating point numbers and/or large files. Set "
"the --columns-to-verify flag to specify a specific "
"set of columns that index the feature file "
"for faster duplicate checking. By default, only "
"the last duplicate is kept.")
parser.add_argument("--columns-to-verify", nargs='+',
help="A list of column names in the feature file "
"which index the features. These are used to "
"efficiently drop duplicate rows when updating "
"a feature file.")
parser.add_argument("--profile",
help="The name of the AWS CLI profile to "
"use when submitting Batch jobs.")
parser.add_argument("--map-job-queue",
default="mpower-feature-extraction",
help="The Batch job queue to submit jobs to.")
parser.add_argument("--map-job-definition",
default="mpower-feature-extraction",
help="The Batch job definition to use for "
"each job.")
parser.add_argument("--reduce-job-queue",
default="mpower-feature-extraction",
help="The Batch job queue to submit the "
"reduce job to.")
parser.add_argument("--reduce-job-definition",
default="mpower-feature-reduction",
help="The Batch job definition to use for "
"the reduce features job.")
args = parser.parse_args()
return args
def verify_args(args):
if args.update is DUMMY_ARG and args.activity_name is DUMMY_ARG:
raise TypeError("The --activity-name flag must be set if "
"the --update flag is not set.")
if args.update != DUMMY_ARG and args.output_synapse != DUMMY_ARG:
raise TypeError("The --update and --output-synapse flags "
"cannot both be set.")
if args.verify_integrity is None and args.columns_to_verify:
raise RuntimeError("--columns-to-verify is specified but "
"--verify-integrity is not. Did you mean to "
"include --verify-integrity in the command "
"line arguments?")
def get_batch_client(profile):
session = boto3.session.Session(profile_name=profile)
batch_client = session.client("batch")
return batch_client
def get_filtered_table(syn, input_table, assay_column):
q = syn.tableQuery("SELECT recordId FROM {} WHERE \"{}\" IS NOT NULL".format(
input_table, assay_column))
table_df = q.asDataFrame()
return table_df
def get_all_record_ids(syn, input_table, assay_column):
table_df = get_filtered_table(syn, input_table, assay_column)
record_ids = table_df.recordId.values
return record_ids
def get_sep(file_format):
if file_format == "csv":
return ","
elif file_format == "tsv":
return "\t"
else:
return file_format
def get_diff(syn, input_table, update, file_format, assay_column):
all_record_ids = get_all_record_ids(syn, input_table, assay_column)
features = syn.get(update)
features_df = pandas.read_csv(features.path,
sep=get_sep(file_format),
index_col=False,
header=0,
usecols=['recordId'])
already_processed = features_df.recordId.values
diff_record_ids = list(set(all_record_ids).difference(already_processed))
return diff_record_ids
def chunk_up_record_ids(record_ids, chunk_size):
chunks = []
for i in range(0, len(record_ids), chunk_size):
record_ids_subset = record_ids[i:(i+chunk_size)]
chunks.append(record_ids_subset)
if len(chunks) > AWS_BATCH_ARRAY_SIZE_LIMIT:
min_chunk_size = len(chunks) // AWS_BATCH_ARRAY_SIZE_LIMIT + 1
raise RuntimeError("Minimum chunk size is {}".format(min_chunk_size))
return chunks, len(chunks)
def build_job_table(syn, record_id_chunks, job_table_parent):
job_table = []
for i in range(0, len(record_id_chunks)):
json_dict = {'recordIds': list(record_id_chunks[i])}
json_dict_str = json.dumps(json_dict)
job_table.append([i, json_dict_str])
job_table = pandas.DataFrame(job_table, columns=['uid', 'jobInfo'])
cols = synapseclient.as_table_columns(job_table)
for col in cols:
if col['name'] == 'uid': col['maximumSize'] = 36
if col['name'] == 'jobInfo': col['columnType'] = 'LARGETEXT'
schema = synapseclient.Schema(
name="batch-job-submission-{}".format(uuid.uuid4()),
columns=cols,
parent=job_table_parent)
table = syn.store(synapseclient.Table(schema, job_table))
return table.schema.id
def submit_map_job(batch_client, job_queue, job_definition,
array_job_size, job_table, input_table,
assay_name, assay_column, output_path):
job = batch_client.submit_job(
jobName=job_table,
jobQueue=job_queue,
jobDefinition=job_definition,
arrayProperties={'size': array_job_size},
parameters={
"jobTable": job_table,
"inputTable": input_table,
"assayName": assay_name,
"assayColumn": assay_column,
"outputPath": output_path})
return job['jobId']
def build_reduce_file(syn, reduce_file_parent, used, executed,
verify_integrity, verify_columns, dependency):
used = list(USED.union(used)) if used else list(USED)
executed = list(EXECUTED.union(executed)) if executed else list(EXECUTED)
json_dict = {
'used': used,
'executed': executed,
'verify_integrity': verify_integrity,
'verify_columns': verify_columns}
json_dict_str = json.dumps(json_dict)
temp_local_reduce_fname = "{}.json".format(dependency)
with open(temp_local_reduce_fname, "w") as f:
print(json_dict_str, file=f)
f = synapseclient.File(temp_local_reduce_fname, parent=reduce_file_parent)
reduce_job_file = syn.store(f)
os.system("rm {}".format(temp_local_reduce_fname))
return reduce_job_file.id
def submit_reduce_job(batch_client, dependency, reduce_job_queue,
reduce_job_definition, input_path, file_format,
activity, assay, output, features_to_update,
reduce_file, job_table):
"""Submit a job to AWS Batch which depends on the array job
submitted to extract the features.
Parameters
----------
batch_client : botocore.client.Batch
dependency : str
Job ID of array job submitted to extract features.
reduce_job_queue : str
AWS Batch queue to submit the reduce job to.
reduce_job_definition : str
AWS Batch job definition to use for reduce job.
input_path : str
Path to where both the extracted features will be written,
and where their concatenation will be written to when
`features_to_update` is None. Otherwise the features will
be appended to the path where `features_to_update` is saved.
file_format : str
Format of or delimiter of `features_to_update` (if specified),
and the extracted features.
activity : str
Name of the activity (tremor, tapping, ...). Not used when
`features_to_update` is specified.
assay : str
Name of the assay. Also the subdirectory within `input_path`
containing the extracted features.
output : str
Synapse ID to store feature file at.
features_to_update : str
Synapse ID of preexisting feature file to append extracted
features to.
reduce_file : str
Synapse ID of json file containing reduce job info.
job_table : str
Synapse ID of job table. Will be deleted.
"""
reduce_job = batch_client.submit_job(
jobName="reduce_{}".format(dependency),
jobQueue=reduce_job_queue,
jobDefinition=reduce_job_definition,
dependsOn=[{'jobId': dependency, 'type': 'SEQUENTIAL'}],
parameters={
"output": output,
"inputPath": input_path,
"reduceJobFile": reduce_file,
"activity": activity,
"assay": assay,
"update": features_to_update,
"jobTable": job_table,
"fileFormat": file_format})
def main():
args = read_args()
verify_args(args)
syn = synapseclient.login()
batch_client = get_batch_client(args.profile)
if args.update is DUMMY_ARG:
record_ids = get_all_record_ids(syn, args.input_table, args.assay_column)
else:
record_ids = get_diff(syn, args.input_table, args.update,
args.file_format, args.assay_column)
if len(record_ids):
record_id_chunks, array_job_size = chunk_up_record_ids(
record_ids, args.chunk_size)
job_table = build_job_table(
syn, record_id_chunks, args.job_info_parent)
map_job_id = submit_map_job(batch_client, args.map_job_queue,
args.map_job_definition, array_job_size,
job_table, args.input_table, args.assay_name,
args.assay_column, args.output_path)
reduce_file = build_reduce_file(syn, args.job_info_parent, args.used,
args.executed, args.verify_integrity,
args.columns_to_verify, map_job_id)
submit_reduce_job(batch_client, map_job_id, args.reduce_job_queue,
args.reduce_job_definition, args.output_path,
args.file_format, args.activity_name, args.assay_name,
args.output_synapse, args.update, reduce_file,
job_table)
else:
print("No new records in column {} "
"to process.".format(args.assay_column))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment