Skip to content

Instantly share code, notes, and snippets.

@squirrelo
Last active August 29, 2015 14:26
Show Gist options
  • Save squirrelo/8d3ab0ae5a4cdced4103 to your computer and use it in GitHub Desktop.
Save squirrelo/8d3ab0ae5a4cdced4103 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
from datetime import datetime
import re
import binascii
import os
from copy import deepcopy
from amgut.lib.data_access.sql_connection import SQLConnectionHandler
from amgut.connections import ag_data
db_conn = SQLConnectionHandler()
def old_surveys():
"""Get all old surveys as a list of dicts keyed by column name
"""
sql = """select * from ag_animal_survey"""
with db_conn.get_postgres_cursor() as cur:
cur.execute(sql)
columns = [x[0] for x in cur.description]
rows = (dict(zip(columns, row)) for row in cur.fetchall())
return rows
def get_multiples(ag_login_id, pname, prefix):
"""Gets multiples answers from the ag_survey_multiples table
Parameters
----------
ag_login_id : str
pname : str
the name of the participant that provided the answer
prefix : str
the prefix of the question (e.g., "diabetes_medications" would retrieve
"diabetes_medications_1", "diabetes_medications_2", etc.
Returns
-------
list
A list of the item values
"""
sql = """select item_name, item_value from ag_survey_multiples
where item_name like '{}%'
and ag_login_id = '{}'
and participant_name = '{}'""".format(prefix, ag_login_id, pname)
return {iname: ival for iname, ival in db_conn.execute_fetchall(sql)
if ival is not None}
def direct_transfer(old, new, old_id, new_id):
if old[old_id] is not None:
new[new_id] = old[old_id].capitalize()
return new
other_pets_regex = re.compile('pet_([0-9]+)')
def other_pets(old, new, ag_login_id):
pets = get_multiples(ag_login_id, new[127], 'pet')
pets = [v for k, v in pets.items() if other_pets_regex.search(k)]
new[144] = '; '.join(pets)
return new
def humans(old, new, ag_login_id):
humans = get_multiples(ag_login_id, new[127], 'human')
h = []
for k, sex in humans.items():
# Map proper human sex and age together
if k.endswith('sex'):
h_id = k.split('_')[1]
age = humans.get('human_%s_age' % h_id, 'Unspecified')
h.append('%s: %s' % (sex, age))
new[145] = '; '.join(h)
return new
def animal_type(old, new):
if old['type'] is None:
return new
if old['type'] == "Small mammal":
new[128] = "Small Mammal"
elif old['type'] == "Large mammal":
new[128] = "Large Mammal"
else:
new = direct_transfer(old, new_responses, 'type', 128)
return new
#ARP Yay, outside time!
def outside_time(old, new):
if old['outside_time'] is None:
return new
if old['outside_time'].strip() == "1":
new[139] = 'Less than 2'
elif old['outside_time'] == "2":
new[139] = '2-4'
else:
new = direct_transfer(old, new, 'outside_time', 139)
return new
def food_source(old, new):
if old['food_source_store'] is not None:
new[135].append('Pet store food')
if old['food_source_human'] is not None:
new[135].append('Human food')
if old['food_source_wild'] is not None:
new[135].append('Wild food')
return new
def special_food(old, new):
if old['organic_food'] is not None:
new[137].append('Organic')
if old['grain_free_food'] is not None:
new[137].append('Grain free')
return new
# Each of the functions above takes the old survey responses and the new
# survey responses, and returns a copy of the new survey responses with
# modifications made based on the old survey responses
if __name__ == '__main__':
# Set up defaults for new questions
# If it's a single or multiple, then default is "Unspecified"
# If it's a string or text, then the default is ''
sql = """SELECT survey_question_id, survey_response_type
FROM survey_question_response_type
JOIN group_questions USING (survey_question_id)
JOIN surveys USING (survey_group)
WHERE survey_id = 2"""
# maps question IDs to response types
qids_response_types = {row[0]: row[1]
for row in db_conn.execute_fetchall(sql)}
# maps response types to default values. Using lists so that the insert
# later can be agnostic of the type of data being inserted
default_values = {'SINGLE': 'Unspecified',
'MULTIPLE': ['Unspecified'],
'STRING': '',
'TEXT': ''}
total_surveys = 0
transfers = 0
for row in old_surveys():
total_surveys += 1
ag_login_id = row.pop('ag_login_id')
pname = row['participant_name']
transfers += 1
# record the new responses, which begin as default values based on
# response type
new_responses = {
qid: deepcopy(default_values[response_type])
for qid, response_type in qids_response_types.items()}
new_responses = direct_transfer(row, new_responses, 'participant_name', 127)
new_responses = animal_type(row, new_responses)
new_responses = direct_transfer(row, new_responses, 'origin', 129)
new_responses = direct_transfer(row, new_responses, 'age', 130)
new_responses = direct_transfer(row, new_responses, 'gender', 131)
new_responses = direct_transfer(row, new_responses, 'setting', 132)
new_responses = direct_transfer(row, new_responses, 'weight', 133)
new_responses = direct_transfer(row, new_responses, 'diet', 134)
new_responses = food_source(row, new_responses)
new_responses = direct_transfer(row, new_responses, 'food_type', 136)
new_responses = special_food(row, new_responses)
new_responses = direct_transfer(row, new_responses, 'living_status', 138)
new_responses = outside_time(row, new_responses)
new_responses = direct_transfer(row, new_responses, 'toilet', 140)
new_responses = direct_transfer(row, new_responses, 'coprophage', 141)
new_responses = direct_transfer(row, new_responses, 'comments', 142)
new_responses = other_pets(row, new_responses, ag_login_id)
new_responses = humans(row, new_responses, ag_login_id)
sql = """insert into {}
(survey_id, survey_question_id, response)
VALUES
(%s, %s, %s)"""
survey_id = binascii.hexlify(os.urandom(8))
ag_login_info = ag_data.get_login_info(ag_login_id)[0]
# add pet survey consent and create survey ID for them
consent_details = {
'login_id': ag_login_id,
'participant_name': pname,
'participant_email': ag_login_info['email'],
'assent_obtainer': 'ANIMAL_SURVEY',
'parent_1_name': 'ANIMAL_SURVEY',
'parent_2_name': 'ANIMAL_SURVEY',
'survey_id': survey_id,
'is_juvenile': True,
'deceased_parent': False,
'obtainer_name': 'ANIMAL_SURVEY',
'age_range': 'ANIMAL_SURVEY'
}
with db_conn.get_postgres_cursor() as cur:
cur.execute("""
INSERT INTO ag_consent
(ag_login_id, participant_name, is_juvenile,
parent_1_name, parent_2_name, deceased_parent,
participant_email, assent_obtainer, age_range,
date_signed)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, NOW())""",
(consent_details['login_id'],
consent_details['participant_name'],
consent_details['is_juvenile'],
consent_details['parent_1_name'],
consent_details['parent_2_name'],
consent_details['deceased_parent'],
consent_details['participant_email'],
consent_details['obtainer_name'],
consent_details['age_range']))
cur.execute("""
INSERT INTO ag_login_surveys
(ag_login_id, survey_id, participant_name)
VALUES (%s, %s, %s)""",
(consent_details['login_id'],
consent_details['survey_id'],
consent_details['participant_name']))
for qid, response_type in qids_response_types.items():
if response_type == 'SINGLE':
query = sql.format('survey_answers')
cur.execute(query, (survey_id, qid, new_responses[qid]))
elif response_type == 'MULTIPLE':
query = sql.format('survey_answers')
# If we have a response other than the default response,
# then pop the default value
if len(new_responses[qid]) > 1 and 'Unspecified' in new_responses[qid]:
new_responses[qid].remove('Unspecified')
# Record all remaining responses to the database
for resp in new_responses[qid]:
cur.execute(query, (survey_id, qid, resp))
elif response_type in ('STRING', 'TEXT'):
response = '["%s"]' % new_responses[qid]
query = sql.format('survey_answers_other')
cur.execute(query, (survey_id, qid, response))
else:
raise ValueError("Unrecognized response type: %s" %
response_type)
print "transferred", transfers, "out of", total_surveys, "attempts"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment