Skip to content

Instantly share code, notes, and snippets.

@mattsouth
Last active November 8, 2016 16:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattsouth/db8f2d09acf3c57ba605fa93c4e8d03e to your computer and use it in GitHub Desktop.
Save mattsouth/db8f2d09acf3c57ba605fa93c4e8d03e to your computer and use it in GitHub Desktop.
import pyxnat
import os
import shutil
import requests
import zipfile
import requests
import time
# A python 2.7 / pyxnat script for copying an xnat project which consists of
# subjects with multiple MRSession experiments
# these lists from https://wiki.xnat.org/display/XNAT16/XNAT+REST+XML+Path+Shortcuts
subject_attrs=[
'group',
'src',
'pi_firstname',
'pi_lastname',
'dob',
'yob',
'gender',
'handedness',
'ses',
'education',
'educationDesc',
'race',
'ethnicity',
'weight',
'height',
'gestational_age',
'post_menstrual_age',
'birth_weight',
'label'
]
experiment_attrs=[
'visit_id',
'date',
'ID',
'project',
'label',
'time',
'note',
'pi_firstname',
'pi_lastname',
'validation_method',
'validation_status',
'validation_date',
'validation_notes',
#'subject_ID',
'subject_label',
'subject_project',
'scanner',
'operator',
'dcmAccessionNumber',
'dcmPatientName',
'session_type',
'modality',
'UID',
'coil'
]
scan_attrs=[
'ID',
'type',
'UID',
'note',
'quality',
'condition',
'series_description',
'documentation',
'scanner',
'modality',
'frames',
'validation_method',
'validation_status',
'validation_date',
'validation_notes',
'coil',
'fieldStrength',
'marker',
'stabilization',
#'orientation',
#'scanTime',
#'originalFileName',
#'systemType',
#'fileType',
#'transaxialFOV',
#'acqType',
#'facility',
#'numPlanes',
#'numFrames',
#'numGates',
#'planSeparation',
#'binSize',
#'dataType'
]
# temporary destination for downloaded imaging files
filepathscratch='/tmp'
def convert_bytes(num):
"""
this function will convert bytes to MB.... GB... etc
"""
for x in ['bytes', 'KB', 'MB', 'GB', 'TB']:
if num < 1024.0:
return "%3.1f %s" % (num, x)
num /= 1024.0
def summarise(arr):
if arr:
return ' - ignored: {}'.format(arr)
else:
return ''
# gather source xnat details
print '** COPY PROJECT **'
source_url=raw_input("Enter the Source xnat url: ")
source_project_name=raw_input("Enter the Source xnat project id: ")
print 'Enter credentials for source xnat, {}'.format(source_url)
source_interface = pyxnat.Interface(source_url)
source_project = source_interface.select.project(source_project_name)
if source_project.exists():
# gather target xnat details
target_url=raw_input("Enter the Target xnat url: ")
target_project_name=raw_input("Enter the Target xnat project id: ")
print 'Enter credentials for target xnat, {}'.format(target_url)
target_interface = pyxnat.Interface(target_url)
target_project = target_interface.select.project(target_project_name)
if target_project.exists():
time_start_script=time.time()
for subject_id in source_interface.select.project(source_project_name).subjects().get():
time_start_subject=time.time()
subject = source_project.subject(subject_id)
target_subject=target_interface.select.project(target_project_name).subject(subject.label())
if not(target_subject.exists()):
vals={}
for attr in subject_attrs:
if subject.attrs.get(attr):
vals[attr]=subject.attrs.get(attr)
print 'creating subject: {}'.format(subject.label())
target_subject.insert(**vals)
else:
print 'using subject: {}'.format(subject.label(),subject.label())
for experiment_id in subject.experiments().get():
time_start_experiment=time.time()
experiment = subject.experiment(experiment_id)
target_experiment=target_subject.experiment(experiment.label())
if not(target_experiment.exists()):
vals={}
ignored=[]
for attr in experiment_attrs:
try:
if experiment.attrs.get(attr):
vals[attr]=experiment.attrs.get(attr)
except:
ignored.append(attr)
print '\tcreating experiment: {}{}'.format(experiment.label(),summarise(ignored))
target_experiment.insert(**vals)
else:
print '\tusing experiment: {}'.format(experiment.label())
target_experiment_files=target_experiment.resources().get()
if not target_experiment.scans().get():
time_start_resources_download=time.time()
print '\tdownloading experiment files'
filepatharchive = experiment.scans().download(filepathscratch)
archive=zipfile.ZipFile(filepatharchive)
archive.extractall(filepathscratch)
print '\tdownloaded and unzipped {} in {} seconds'.format(convert_bytes(os.stat(filepatharchive).st_size),time.time()-time_start_resources_download)
for scan_id in experiment.scans().get():
scan = experiment.scan(scan_id)
target_scan=target_experiment.scan(scan.label())
if not(target_scan.exists()):
vals={}
ignored=[]
for attr in scan_attrs:
try:
if scan.attrs.get(attr):
vals[attr]=scan.attrs.get(attr)
except:
ignored.append(attr)
print '\t\tcreating scan: {}{}'.format(scan.label(),summarise(ignored))
target_scan.insert(**vals)
else:
print '\t\tfound target scan: {}'.format(target_scan.label())
# zip and push files
scan_files=target_scan.resources().get()
if not scan_files:
resources = '{}/{}/scans/{}-{}/resources'.format(filepathscratch, experiment.label(),scan.label(),scan.attrs.get('type')).replace(" ", "_")
# each subdir of resources is a catalog that must be zipped and uploaded separately
for catalog in os.listdir(resources):
if os.path.isdir(os.path.join(resources, catalog)):
time_start_catalog=time.time()
# zip up content of files directory
filenamezip='{}.zip'.format(catalog)
filepathzip=os.path.join(resources,filenamezip)
print '\t\t\tcreating and uploading zip: {}'.format(filenamezip)
with zipfile.ZipFile(filepathzip,'w') as zip:
for file in os.listdir(os.path.join(resources, catalog, 'files')):
zip.write(os.path.join(resources, catalog, 'files', file), file)
zip.close()
# and upload
with open(filepathzip, 'rb') as payload:
headers={'content-type':'application/zip','Cookie':target_interface._jsession}
url='{}/data/archive/projects/{}/subjects/{}/experiments/{}/scans/{}/resources/{}/files?extract=true&inbody=true'.format(target_url,target_project_name,subject.label(),experiment.label(),scan.label(),catalog)
if catalog=='SNAPSHOTS':
url=url+'&content=THUMBNAIL'
r = requests.post(url,data=payload,headers=headers)
print '\t\t\tuploaded {} in {} seconds'.format(convert_bytes(os.stat(filepathzip).st_size),time.time()-time_start_catalog)
# delete archive zip and folder
os.remove(filepatharchive)
shutil.rmtree('{}/{}'.format(filepathscratch, experiment.label()))
print '\tCopy experiment completed in {} seconds'.format(time.time()-time_start_subject)
else:
print '\tfound target scans, skipped scan and resource copy'
print 'Copy subject completed in {} seconds'.format(time.time()-time_start_subject)
print '\nCopy project completed in {} seconds'.format(time.time()-time_start_script)
else:
print 'cannot attach to target project, {} on {}'.format(target_project_name, target_url)
else:
print 'cannot attach to source project, {} on {}'.format(source_project_name, source_url)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment