Last active
August 29, 2015 14:08
-
-
Save adamrp/7b9c98051f0e9e595c02 to your computer and use it in GitHub Desktop.
Port studies from old database to qiita
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from datetime import datetime | |
from os.path import join | |
import click | |
from data_access_connections import data_access_factory | |
from enums import ServerConfig, DataAccessType | |
from qiita_db.study import StudyPerson | |
qda = data_access_factory(ServerConfig.data_access_type, 'qiime') | |
SHOW_FULL_STR = "Show full list of Study People" | |
NEW_STR = "Create new Study Person" | |
def null_blank_whitespace(old): | |
if old in (None, ''): | |
return None | |
return old.lower().strip() | |
def boolean(old): | |
if old is None: | |
return None | |
if old in ('yes', 'y'): | |
return 't' | |
else: | |
return 'f' | |
def funding(old): | |
new = null_blank_whitespace(old) | |
if new in ("no", "yes", "$$"): | |
return None | |
else: | |
return new | |
def most_recent_contact(old): | |
new = null_blank_whitespace(old) | |
if new is None: | |
return None | |
try: | |
dtime = datetime.strptime(old, '%m/%d/%Y') | |
except ValueError: | |
return None | |
else: | |
return dtime.strftime('%m/%d/%Y') | |
def spatial_series(old): | |
return boolean(null_blank_whitespace(old)) | |
def portal_type_id(old): | |
if old == 'emp': | |
return 3 | |
if old == 'qiime': | |
return 1 | |
@click.group() | |
def cli(): | |
pass | |
def _menu(options, prompt="Please select an option."): | |
response = -1 | |
print '++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++' | |
for i, opt in enumerate(options): | |
print '%d. %s' % (i, opt) | |
print prompt | |
while response not in range(len(options)): | |
response = raw_input("(enter number between 0 and %d)" % | |
(len(options)-1)) | |
print '--------------------------------------------------------------' | |
try: | |
response = int(response) | |
except ValueError: | |
print "Please only enter numbers\n" | |
return response | |
def _format_sp_str(sp): | |
"""Generatees the 'name, email, affiliation' string for a study person""" | |
return '%s, %s, %s' % (sp.name, sp.email, sp.affiliation) | |
@cli.command() | |
@click.argument('study_ids', nargs=-1, type=int, required=True) | |
@click.option('--output-dir', type=click.Path(exists=False, file_okay=False, | |
writable=True), default='.') | |
def get_study_config(study_ids, output_dir): | |
required_fields = ['timeseries_type_id', 'mixs_compliant', | |
'portal_type_id', 'reprocess', 'study_alias', | |
'study_description', 'study_abstract', | |
'metadata_complete', 'efo_ids', | |
'principal_investigator'] | |
optional_fields = ['funding', 'most_recent_contact', 'spatial_series', | |
'number_samples_collected', 'number_samples_promised', | |
'vamps_id', 'study_id'] | |
SQL = """select 1 as timeseries_type_id, 't' as mixs_compliant, | |
portal_type as portal_type_id, 'f' as reprocess, study_alias, | |
study_description, study_abstract, 'f' as metadata_complete, | |
funding, most_recent_contact, spatial_series, | |
number_samples_collected, number_samples_promised, vamps_id, | |
study_id, principal_investigator as principal_investigator_name, | |
principal_investigator_contact | |
from study | |
where study_id in ({})""".format( | |
', '.join([str(x) for x in study_ids])) | |
results = qda.dynamicMetadataSelect(SQL) | |
cols = [x[0].lower() for x in results.description] | |
for row in results: | |
# Get all study people so that we can minimize user interaction here | |
study_people = list(StudyPerson.iter()) | |
names = [p.name for p in study_people] | |
row_dict = dict(zip(cols, row)) | |
print '**********', row_dict['study_id'], '**********' | |
print '(((((((((', row_dict['principal_investigator_contact'], | |
print '(((((((((' | |
# Clean up values | |
row_dict['funding'] = funding(row_dict['funding']) | |
row_dict['most_recent_contact'] = most_recent_contact( | |
row_dict['most_recent_contact']) | |
row_dict['spatial_series'] = spatial_series( | |
row_dict['spatial_series']) | |
row_dict['portal_type_id'] = portal_type_id( | |
row_dict['portal_type_id']) | |
# just inserting 1 here for now, as usual... | |
row_dict['efo_ids'] = 1 | |
# Need to get PI information | |
pi_name = row_dict['principal_investigator_name'] | |
if pi_name is not None: | |
pi_name = pi_name.strip() | |
print '*****', pi_name, '*****' | |
if not pi_name: | |
# catches both blanks and Nulls | |
pi_name = raw_input("What is the name of the PI?") | |
# See if we already have this PI as a StudyPerson | |
options = [p for p in study_people if p.name == pi_name] | |
selection = None | |
if len(options) == 1: | |
# If there is only one PI by this name, use it | |
pi = options[0] | |
elif len(options) != 1: | |
# If there is not exactly one PI, then use a menu system to | |
# determine the PI | |
menu_options = [_format_sp_str(opt) for opt in options] | |
if menu_options: | |
menu_options.append(SHOW_FULL_STR) | |
selected_index = _menu( | |
menu_options, "Multiple people named %s; please select " | |
"the PI from the list below" % pi_name) | |
selected_option = menu_options[selected_index] | |
else: | |
print "-> No existing PIs found with name %s" % pi_name | |
selected_option = SHOW_FULL_STR | |
if selected_option == SHOW_FULL_STR: | |
# Show the full menu | |
options = study_people | |
menu_options = [_format_sp_str(opt) for opt in options] | |
if menu_options: | |
menu_options.append(NEW_STR) | |
selected_index = _menu(menu_options) | |
selected_option = menu_options[selected_index] | |
else: | |
print "-> There are no people in the database" | |
selected_option = NEW_STR | |
if selected_option == NEW_STR: | |
print "Create a new entry in the database for %s" % pi_name | |
# Create new study person | |
pi_name = raw_input("Enter a new name (if desired; default " | |
"will be %s)" % pi_name) or pi_name | |
pi_email = raw_input("What is the email for %s?" % pi_name) | |
pi_affiliation = raw_input("What is the affiliation for %s?" % | |
pi_name) | |
pi = StudyPerson.create(pi_name, pi_email, pi_affiliation) | |
else: | |
# Use the selected person | |
pi = options[selected_index] | |
row_dict['principal_investigator'] = _format_sp_str(pi) | |
# Write out the configuration file | |
outfile_fp = join(output_dir, str(row_dict['study_id']) + '.txt') | |
with open(outfile_fp, 'w') as outfile: | |
outfile.write('[required]\n') | |
for field in required_fields: | |
outfile.write('%s = %s\n' % (field, str(row_dict[field]))) | |
outfile.write('\n[optional]\n') | |
for field in optional_fields: | |
if row_dict[field] is not None: | |
outfile.write('%s = %s\n' % (field, str(row_dict[field]))) | |
if __name__ == '__main__': | |
cli() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment