Skip to content

Instantly share code, notes, and snippets.

@sburns
Forked from bud42/SpidersApi.ipynb
Last active December 15, 2015 04:19
Show Gist options
  • Save sburns/5200505 to your computer and use it in GitHub Desktop.
Save sburns/5200505 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"metadata": {
"name": "SpidersApi"
},
"nbformat": 3,
"nbformat_minor": 0,
"worksheets": [
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# CCI Python Package\n",
"\n",
"This notebook is to flesh out the API of a python package the CCI should provide to users who want to run large-scale image processing jobs against the VUIIS XNAT.\n",
"\n",
"## Goals\n",
"\n",
"- Provide a high-level interface so CCI-developed image processing routines (Freesurfer, DTI QA, fMRI QA, DCM->NII, etc) can be implemented in extremely short spiders\n",
"- Provide medium- and low-level hooks so advanced developers can pick and choose what level they'd like to use the CCI/XNAT infrastructure.\n",
"\n",
"These goals accomplish two things:\n",
"\n",
"- Basic users can get spiders up and running extremely quickly.\n",
"- Advanced users are not shut out from:\n",
" - using provided infrastructure \n",
" - sending advanced image processing pipelines \"upstream\" to the CCI.\n",
"\n",
"**1 assessor = 1 job = 1 PBS script = 1 PBS output = 1 exp or 1 scan** \n",
"\n",
"First, the basics:"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# This could be more than just nightly right?\n",
"from cci import FreesurferProcessor, DtiQaProcessor\n",
"project_list = {'Proj1', 'Proj2'}\n",
"processor_list = {FreesurferProcessor, DtiQaProcessor}\n",
"\n",
"for project in project_list:\n",
" for subject in project.subject_list():\n",
" for session in subject.session_list():\n",
" for processor in processor_list:\n",
" task = processor.getTask(session)\n",
" task.update()\n",
" \n",
"# And/or we provide the loop infrastructure\n",
"from cci import ScanLoop, ExperimentLoop, SubjectLoop\n",
"# Loops take Runner(s)\n",
"ScanLoop(project, subject, experiment, [FreesurferProcessor, DtiQaProcessor]) # Must provide ScanProcessors\n",
"ExperimentLoop(project, subject, [ExperimentRunner]) # Hits all experiments, provide ExperimentProcessors\n",
"SubjectLoop(project, [SubjectRunner]) # Hits all subjects, provide SubjectProcessors"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Hashing out `Task` more..."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from cci import cluster\n",
"\n",
"class Task(object):\n",
" \n",
" def __init__(self, processor, session):\n",
" # Anyting else in the constructor\n",
" self.processor = processor\n",
" self.session = session\n",
" # Search up the hierarchy to fill in self.project/etc?\n",
" \n",
" def update(self):\n",
" if not self.exists():\n",
" self.create()\n",
" if self.has_inputs():\n",
" self.start()\n",
" else:\n",
" self.set_status('MISSING_INPUTS') # set procstatus in XNAT\n",
" else: \n",
" status = self.get_status()\n",
" self.handle_for_status(status)\n",
"\n",
" def handle_for_status(self, status):\n",
" if status == 'RUNNING':\n",
" job_id = self.get_job_id()\n",
" if not cluster.is_job_running(job_id):\n",
" # This is actually a restart\n",
" # Notify?\n",
" self.start()\n",
" elif status == \"SUCCESS\":\n",
" # Notify with results?\n",
" pass\n",
"\n",
" def get_status(self):\n",
" # Hit xnat for procstatus\n",
" pass\n",
" \n",
" def self.has_inputs():\n",
" # wrapper around self.processor.can_run()\n",
" pass\n",
" \n",
" def start(self):\n",
" self.write_pbs()\n",
" jobID = cluster.submit(pbs_path)\n",
" self.set_status('RUNNING')\n",
" self.set_job_id(jobID)\n",
" \n",
" def set_status(self, status):\n",
" # set process status in XNAT\n",
" pass\n",
" \n",
" def set_job_id(self, job_id):\n",
" # Set job id in xnat\n",
" pass\n",
" \n",
" def exists(self):\n",
" # hit xnat to find if this accessor exists\n",
" pass\n",
"\n",
" def name(self):\n",
" pass\n",
" \n",
" def pbs_path(self):\n",
" # return canonical storage place for pbs\n",
" # ~/.pbs/%(name).pbs?\n",
" pass\n",
" \n",
" def header(self):\n",
" # return List of strings\n",
" # ask self.processor to help fill in wall time,etc\n",
" # Where to get user/group/email options?\n",
" pass\n",
" \n",
" def commands(self):\n",
" # return list of strings\n",
" # ask self.processor for commands and combine with self.session?\n",
" pass\n",
" \n",
" def write_pbs(self):\n",
" commands = self.commands()\n",
" header = self.header()\n",
" with open(self.pbs_path(), 'w') as f:\n",
" f.writelines(header + commands)\n",
" # Or break this into a PBSGenerator class that we could better test\n",
" # and provide to lower-level users"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"SB - My biggest issue with Task right now is how tightly coupled it is to jobs on ACCRE."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Python Data Types (classes):** \n",
"Project (pyxnat) \n",
"Experiment (pyxnat) \n",
"Scan (pyxnat) \n",
"Assessor (pyxnat) \n",
"GenericJob \n",
"ScanJob \n",
"ExperimentJob \n",
"AssessorJob \n",
"ProjectJob \n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**XNAT Processing Data Type (genProcData) Fields:** \n",
"procType \n",
"procStatus \n",
"walltimeused \n",
"memused \n",
"scans \n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Build Status** (stored in genProcData/procstatus???) \n",
" BUILD_COMPLETE ==> complete assessor already exists in XNAT \n",
" PRE_RUNNING ==> prequisites not complete in XNAT \n",
" PRE_COMPLETE ==> prequisites complete, no assessor yet in XNAT \n",
" JOB_COMPLETE ==> partial assessor exists in XNAT & job is complete (job.completed exists & output exists) \n",
" XVFB_ERROR ==> partial assessor exists in XNAT, job normally terminated w/o result (job.completed exists & output folder has 0 files) \n",
" JOB_ERROR ==> partial assessor exists in XNAT, job terminated with job.error \n",
" JOB_RUNNING ==> partial assessor exists in XNAT, job still running \n",
"\n",
"\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**API should provide:** \n",
"\n",
"-Run this task on everything in this project... \n",
"-does this task run per scan, exp, subj? \n",
"-Can this process run on this scan/exp/subj, ie it eligible to run (i.e. has required inputs)? \n",
"-what are inputs to this process? \n",
"-Has this process already been run on this scan/exp/subj? or rather status of the job? \n",
"\n",
"\n",
"API should allow user to build a loop that can either \n",
"A. Run once for all projects/exp/scan and check each processing type at each exp/scan \n",
"B. Run a single processing type on a single project \n",
"C. Run all processing types on a single project \n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Other Notes:** \n",
"-PBS not created or run unless we've determined inputs exist, currently we don't know input existence until we try to download them within PBS (e.g., no REST or no T1 or no NIFTI format)\n",
"\n",
"SB - I think the spider should do everything in it's power to assure a job will successfully run on the cluster **before** the job is submitted, i.e. download images/etc. A job shouldn't fail because xnat was down, it should just not run at that time.\n"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# abstract classes\n",
"class Processor(object): \n",
"\n",
" #methods:\n",
" def job_type(self): # does this job run on a proj/subj/exp/scan?\n",
" pass\n",
"\n",
" def can_run(self): # does this proj/subj/exp/scan meet the requirements\n",
" pass\n",
" \n",
" def hasRun() # has this job already been run on this proj/subj/exp/scan\n",
" # This is a task attribute\n",
" \n",
" getStatus() # what is the status of this job for this proj/subj/exp/scan\n",
" # This is a Task attribute\n",
" \n",
" def get_walltime() # what amount of time is required to run this job\n",
" # yep\n",
" \n",
" setWalltime(days,hrs,mins) # set time required to run this job\n",
" # Nope, make class attribute \n",
" # Not needed \n",
" \n",
" getMemReq() # what amount of memory is required to run this job \n",
" # yep\n",
" \n",
" setMemReq(GBs,MBs) # set memory required to run this job\n",
" # class attr just like walltime\n",
" \n",
" run # run this job \n",
" # Task will do this\n",
" \n",
" writePBS(filename) # write the PBS file for this job\n",
" # Task should do this?\n",
" \n",
" submitPBS(filename) # qsub this PBS file\n",
" # nope, xnat.cluster.submit_pbs\n",
" \n",
"class SessionProcessor # runs on a single session\n",
"class ScanProcessor # runs on a single scan\n",
"class SubjectProcessor # runs on a single subject\n",
"class ProjectProcessor # runs on a single project\n",
"\n",
"# instantiable classes\n",
"class FreesurferProcessor(ScanProcessor)\n",
" wall_days = '0'\n",
" wall_hours = '24'\n",
" wall_minutes = '00'\n",
" wall_seconds = '00'\n",
" req_mem_mb = \"4096\"\n",
" \n",
"\n",
"class DtiQaProcessor(ScanProcessor)\n",
"class FmriQaProcessor(ScanProcessor)\n",
"class CcmRsFmriPreprocSpiderJob(ExperimentProcessor) #?\n",
"class CcmTbFmriPreprocSpiderJob(ExperimentProcessor) #?\n",
"class SpiderProcessHandler # What's this do?\n"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"**Random Thoughts:** \n",
"Could have REST url in XNAT where a POST fires a process to update, i.e. new data is ready for a subject or exp, so we have a process that checks the status of each type of data being produced and advances to the next step as needed, this process will be manually run each time we walk the tree AND can be triggered by a post, this way the data flow doesn't get interrupted by having to wait for the next time the tree is walked.\n",
"\n",
"YES! But how do we make a random xnat url run our update code? This is why I implemented my queuing architecture. Jobs should \"finish\" themselves and only new jobs should start on a schedule."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Lower-Level API\n",
"\n",
"The `cci` package is fundamentally talking to two systems: the ACCRE cluster and VUIIS' XNAT."
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"from cci import cluster\n",
"\n",
"pbs_path = cluster.make_pbs(list_of_commands, account_info)\n",
"job_id = cluster.submit_pbs(pbs_path)\n",
"status = cluster.check_status(job_id)\n",
"\n",
"from cci import xnat"
],
"language": "python",
"metadata": {},
"outputs": []
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The API for dealing with XNAT is a little more advanced though. Nominally, we'd like to provide support to:\n",
"\n",
"- Easily create accessors without users knowing the folder structure.\n",
"- Sync a local filesystem directory with XNAT. We'll want to provide both directions so beginnig jobs can pull data and ending jobs can push data.\n"
]
},
{
"cell_type": "code",
"collapsed": false,
"input": [
"# Creating accessors\n",
"jh = xnat.JobHandler(my_job.project, my_job.subject, my_job.experiment) # accessors are at the subject or experiment level?\n",
"my_job.start()\n",
"\n",
"# this is really stupid but you get the idea\n",
"while not my_job.is_finished():\n",
" pass\n",
"\n",
"if my_job.success():\n",
" jh.add_pdf(my_job.get_pdf())\n",
" jh.add_snapshots(path_to_local_thumbnail, path_to_local_large_version)\n",
" jh.add_directory(local_source, accessor_directory)\n",
" jh.add_provenance(my_job.get_provenance())\n",
" jh.finish() #???\n",
"else:\n",
" jh.clean_up()\n",
"\n",
"# Synchronizing resources from local <--> XNAT\n",
"rs = xnat.ResourceSyncer(my_job.project, my_job.subject, my_job.experiment) # Maybe lower?\n",
"success = rs.sync_to_local(local_top_level)\n",
"success = rs.sync_to_xnat(local_top_level)"
],
"language": "python",
"metadata": {},
"outputs": []
}
],
"metadata": {}
}
]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment