Skip to content

Instantly share code, notes, and snippets.

@dgfitch
Last active March 27, 2023 22:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dgfitch/3a700f7f43ddf83e62c58c964275767a to your computer and use it in GitHub Desktop.
Save dgfitch/3a700f7f43ddf83e62c58c964275767a to your computer and use it in GitHub Desktop.
Widen and pivot NIH Toolbox uploaded data into usable format
#!/usr/bin/env python3
"""
Takes NIH Toolbox data and transforms it from long (one row per score or
response) to wide (one row per participant, or 'PIN' as labeled in the NIH
Toolbox output.
Output from this will be fed to scorify, and the output from that will be fed
to ndaify.py
I know this is kind of bonkers.
Input is all files in <input_dir>, "*Assessment Data.csv" for item-level
responses, and "*Assessment Scores.csv" for T-Scores.
Produces three files in the output directory: scores.csv, responses.csv, and
combined.csv. All will have one row per participant.
Participants are filtered to PINs in the 1000-1999 range, to exclude test data.
Duplicate rows are excluded.
scores.csv contains T-Scores and assessment dates
responses.csv contains individual responses and response times
combined.csv is those two files joined on PIN.
combined.csv is very very big (more than 1500 columns at the time of writing)
so it may be easier to work with scores.csv when you don't need item-level data.
Usage: widen_nihtoolbox.py <input_dir> <output_dir>
"""
from pathlib import Path
import sys
import pandas as pd
import logging
log_format = '%(message)s'
logging.basicConfig(format=log_format)
logging.getLogger().setLevel(logging.WARN) # Stop ipython debug logging bs
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
def prepeare_df(file_list):
"""
Combine, deduplicate, and filter PIN to be in the 1000-1999 range
"""
dfs = [pd.read_csv(f, dtype=str) for f in file_list]
# Rows that don't differ at all are safe to drop
df = pd.concat(dfs).drop_duplicates()
df = df[df['PIN'].str.match(r'^1\d{3}$')].copy()
return df
def prep_scores(score_df):
# The questionnaires and behavioral stuff store their data in different
# columns, combine them for ease of processing
score_df['TScore_combined'] = score_df['TScore'].fillna(
score_df['Fully-Corrected T-score'])
# Drop incomplete sessions and such -- if you don't have a t-score, we don't
# want it
score_filtered = score_df.dropna(how='all', subset=['TScore_combined'])
# Now, widen this. This will give us one row per PIN, one column per
# instrument for each of tscore_combined, SE, and DateFinished
# it is a lot of columns
score_pivoted = score_filtered.pivot(
index='PIN',
columns=['Inst'],
values=[
'TScore_combined',
'SE',
'DateFinished',
'RawScore',
'Theta']
)
# Squash the multi-index column structure, sort it by assessment name, and
# save it as a CSV
score_pivoted.columns = [f'{parts[1]}|{parts[0]}' for parts in score_pivoted.columns]
score_pivoted = score_pivoted[sorted(score_pivoted.columns)].copy()
return score_pivoted
def prep_raw(raw_df):
"""
Prepare the raw items dataframe -- return a dataframe with one row
per participant, one column per item * response * score * response time
It is a hilariously wide dataframe.
"""
raw_df['InstItem'] = raw_df['Inst'].str.cat(raw_df['ItemID'], sep='|')
raw_pivoted = raw_df.pivot(
index='PIN',
columns=['InstItem'],
values=['Response', 'Score', 'ResponseTime'])
raw_pivoted.columns = [
f'{parts[1]}|{parts[0]}' for parts in raw_pivoted.columns]
return raw_pivoted
def main(input_dir, output_dir):
raw_files = input_dir.glob('*Assessment Data.csv')
score_files = input_dir.glob('*Assessment Scores.csv')
raw_df = prepeare_df(raw_files)
score_df = prepeare_df(score_files)
score_pivoted = prep_scores(score_df)
score_pivoted.to_csv(output_dir / 'scores.csv')
logger.info(
f'Wrote {output_dir / "scores.csv"}, shape: {score_pivoted.shape}')
raw_pivoted = prep_raw(raw_df)
raw_pivoted.to_csv(output_dir / 'responses.csv')
logger.info(
f'Wrote {output_dir / "responses.csv"}, shape: {raw_pivoted.shape}')
combined = score_pivoted.join(raw_pivoted)
combined.to_csv(output_dir / 'combined.csv')
logger.info(
f'Wrote {output_dir / "combined.csv"}, shape: {combined.shape}')
if __name__ == '__main__':
if len(sys.argv) != 3:
sys.stderr.write(__doc__)
sys.exit(1)
input_dir = Path(sys.argv[1])
output_dir = Path(sys.argv[2])
main(input_dir, output_dir)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment