Skip to content

Instantly share code, notes, and snippets.

@nanxstats
Last active August 29, 2015 14:21
Show Gist options
  • Save nanxstats/22063967fc746078e263 to your computer and use it in GitHub Desktop.
Save nanxstats/22063967fc746078e263 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
import sys, os, re, time
from subprocess import Popen, PIPE
import json
import requests
__license__ = "MIT"
__doc__ = """
This script is meant as an example of Seven Bridges Genomics IGOR API usage.
Running it will annotate all files in specified directory, upload these files, process them using the specified pipeline and download the results.
Please edit the script to configure the following:
token: Supplied auth token for your user.
client_id: Supplied API key for your organization.
project_id: ID of project to use.
pipeline_id: ID of prepared pipeline to run. The pipeline should have all inputs pre-set, except one.
uploader_path: Path to IGOR command line uploader.
base_meta: A dictionary with metadata common for all files that you will be uploading.
file_regexp: A regular expression containing capture groups used to set metadata for individual files (from file name).
Usage: igor_api_example.py path_to_dir_containing_files_to_process
"""
# Supplied auth token and api key
TOKEN, CLIENT_ID = 'MY_AUTH_TOKEN', 'MY_CLIENT_ID'
# IDs of project, task template and input node
PROJECT_ID, TEMPLATE_TASK_ID, INPUT_NODE_ID = '111', '222', '333'
# Path to SBG uploader
UPLOADER_PATH = '/path/to/sbg-uploader.sh'
# Metadata common for all files
BASE_META = {'file_type': 'fastq', 'seq_tech': 'illumina'}
# Use capture groups to extract metadata from file name
FILE_REGEXP = re.compile(r'^(?P<sample>.*)_(?P<paired_end>[12])\.f(ast)?q$')
class SbgApi(object):
"""
Example wrapper around v0.9 API calls.
Initialize with supplied client id and auth token.
"""
def __init__(self, client_id, auth_token):
self.endpoint = 'https://api.sbgenomics.com/0.9/'
self.headers = {
'X-SBG-CLIENT-ID': client_id,
'X-SBG-AUTH-TOKEN': auth_token,
'Accept': 'application/json',
'Content-type': 'application/json',
}
def _request(self, path, method='GET', query=None, data=None):
data = json.dumps(data) if isinstance(data, dict) else None
print method, self.endpoint + path, query or {}, data
response = requests.request(method, self.endpoint + path, params=query, data=data, headers=self.headers)
response_dict = json.loads(response.content) if response.content else {}
if response.status_code / 100 != 2:
raise Exception(response_dict.get('message', 'Server responded with status code %s.' % response.status_code))
return response_dict
def upload_list(self):
return self._request('upload/')['items']
def upload_details(self, upload_name):
return self._request('upload/%s/' % upload_name)
def project_list(self):
return self._request('project/')['items']
def project_details(self, project_id):
return self._request('project/%s/' % project_id)
def file_list(self, project_id):
return self._request('project/%s/file/' % project_id)['items']
def file_details(self, project_id, file_id):
return self._request('project/%s/file/%s/' % (project_id, file_id))
def file_update(self, project_id, file):
return self._request('project/%s/file/%s/' % (project_id, file.get('id')), method='PUT', data=file)
def file_delete(self, project_id, file_id):
return self._request('project/%s/file/%s/' % (project_id, file_id), method='DELETE')
def file_details(self, project_id, file_id):
return self._request('project/%s/file/%s/' % (project_id, file_id))
def file_download_url(self, project_id, file_id):
return self._request('project/%s/file/%s/download/' % (project_id, file_id))['url']
def file_copy_from_upload(self, project_id, upload_name):
return self._request('project/%s/file/' % project_id, query={'action':'copy', 'from':'/upload/%s'%upload_name}, method='POST')
def pipeline_list(self, project_id):
return self._request('project/%s/pipeline/' % project_id)['items']
def pipeline_details(self, project_id, pipeline_id):
return self._request('project/%s/pipeline/%s/' % (project_id, pipeline_id))
def task_list(self, project_id):
return self._request('project/%s/task/' % project_id)['items']
def task_details(self, project_id, task_id):
return self._request('project/%s/task/%s/' % (project_id, task_id))
def task_new(self, project_id, task):
return self._request('project/%s/task/' % project_id, method='POST', data=task)
def task_stop(self, project_id, task_id):
return self._request('project/%s/task/%s/' % (project_id, task_id), method='POST', query={'action':'stop'})
def run_task_template(self, project_id, task_id, node_id, file_id_list, task_name=None):
"""
Creates a task based on existing task.
"""
task = self.task_details(project_id, task_id)
del task['id']
task['inputs'][node_id] = file_id_list
task['outputs'] = {}
if task_name:
task['name'] = task_name
print task
return self.task_new(project_id, task)
def get_task_outputs(self, project_id, task_id):
"""
Returns a list of all output file IDs if specified task completed successfully.
Returns None if task is still running.
Raises exception if task was aborted or failed.
"""
task = self.task_details(project_id, task_id)
full_status = task.get('status', {})
if full_status.get('status') == 'active':
return None
elif full_status.get('status') != 'completed':
raise Exception('Task %s' % full_status.get('status'))
result = []
for file_id_list in task.get('outputs').values():
result += file_id_list
return result
def upload_directory(uploader_path, directory_path, upload_name, common_metadata=None, file_regexp='.*'):
"""
Utility function that wraps SBG uploader. It annotates files with metadata before uploading.
Returns a dict that maps uploaded file path to its ID on IGOR.
To annotate files, specify a dict containing common metadata for all files, and a regular expression that contains
capture groups for fields that vary from file to file.
"""
if isinstance(file_regexp, basestring):
file_regexp = re.compile(file_regexp)
base_meta = common_metadata or {}
matched_files = []
for dir_path, dirs, files in os.walk(directory_path):
for file_name in files:
match = file_regexp.match(file_name)
if not match:
continue
file_metadata = dict(base_meta, **match.groupdict())
file_path = os.path.join(dir_path, file_name)
with open(file_path + '.meta', 'w') as f:
json.dump(file_metadata, f)
matched_files.append(file_path)
p = Popen([uploader_path, '-v', '-t', TOKEN, '-i', CLIENT_ID, '-n', upload_name] + matched_files, stdout=PIPE)
ret_code = p.wait()
if ret_code:
raise Exception('Uploader exited with code %s' % ret_code)
if __name__ == '__main__':
if len(sys.argv) == 1 or sys.argv[1].startswith('-'):
print __doc__
sys.exit()
# Get directory to upload from args
data_dir = sys.argv[1]
# Use it as upload name
upload_name = os.path.basename(data_dir)
# Run uploader
upload_directory(UPLOADER_PATH, data_dir, upload_name, BASE_META, FILE_REGEXP)
# Prepare for API calls
api = SbgApi(CLIENT_ID, TOKEN)
# Copy the files to project
file_id_list = [f['id'] for f in api.file_copy_from_upload(PROJECT_ID, upload_name)['items']]
# Create a task with our uploaded files replacing old inputs on INPUT_NODE_ID
task_id = api.run_task_template(PROJECT_ID, TEMPLATE_TASK_ID, INPUT_NODE_ID, file_id_list)['id']
# Wait for the task to finish
while True:
results = api.get_task_outputs(PROJECT_ID, task_id)
if results is not None: break
time.sleep(30)
# Download all output files
for file_id in results:
url = api.file_download_url(PROJECT_ID, file_id)
print 'Downloading file %s from %s' % (file_id, url)
print 'Failed' if Popen(['axel', url]).wait() else 'Done'
import requests, json, time
def api(path, method='GET', query=None, data=None):
""" Helper function for API calls """
data = json.dumps(data) if isinstance(data, dict) else None
base_url = 'https://api.sbgenomics.com/0.9/'
headers = {
'X-SBG-Client-Id': 'MY CLIENT ID',
'X-SBG-Auth-Token': 'MY AUTH TOKEN',
'Accept': 'application/json',
'Content-type': 'application/json',
}
print method, base_url+path, query, data
response = requests.request(method, base_url + path, params=query, data=data, headers=headers)
response_dict = json.loads(response.content) if response.content else {}
if response.status_code / 100 != 2:
raise Exception(response_dict.get('message', 'Server responded with status code %s.' % response.status_code))
return response_dict
def find_fastqc():
""" Finds a pipeline which name starts with "FastQC" across all accessible projects. Returns project_id and pipeline_id. """
for project in api('project/')['items']:
for pipeline in api('project/%s/pipeline/' % project['id'])['items']:
if pipeline['name'].startswith('FastQC'):
return project['id'], pipeline['id']
raise Exception('FastQC Pipeline not found')
def get_input_id(project_id, pipeline_id):
""" Assumes there is a single input to pipeline and returns its ID. """
return api('project/%s/pipeline/%s/' % (project_id, pipeline_id))['inputs'][0]['id']
def get_fastq_file_id(project_id):
for file in api('project/%s/file/' % project_id)['items']:
if file['metadata'].get('file_type') == 'fastq':
return file['id']
raise Exception('No fastq files in project')
def run_task(project_id, pipeline_id, input_id, file_id):
task = {
"name": "My task",
"description": "A text description",
"pipeline_id": pipeline_id,
"inputs": {
input_id: [file_id]
}
}
return api('project/%s/task' % project_id, method='POST', data=task)
def wait_for_task(project_id, task_id):
while True:
task = api('project/%s/task/%s/' % (project_id, task_id))
if task['status']['status'] != 'active': break
print task['status']['message']
time.sleep(30)
return task
def get_result_urls(project_id, task_id):
task = api('project/%s/task/%s' % (project_id, task_id))
outputs, urls = [], []
for file_id_list in task.get('outputs').values():
outputs += file_id_list
for file_id in outputs:
urls.append(api('project/%s/file/%s/download' % (project_id, file_id))['url'])
return urls
def do_all():
project_id, pipeline_id = find_fastqc()
input_id = get_input_id(project_id, pipeline_id)
file_id = get_fastq_file_id(project_id)
print project_id, pipeline_id, input_id, file_id
task = run_task(project_id, pipeline_id, input_id, file_id)
task = wait_for_task(project_id, task['id'])
return get_result_urls(project_id, task['id'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment