Created
June 16, 2024 20:15
-
-
Save Jong-Sig/aa72a49af917a8d5db2adacb60a86c33 to your computer and use it in GitHub Desktop.
python script to parse clinicial trial data
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| # Clinical Trial XML Parser | |
| This repository contains a script to parse XML files from ClinicalTrials.gov and clean the resulting dataset. | |
| Note: This code was originally written in 2020 in Python 2.x, so it may be less efficient than more recent versions. | |
| ## Features | |
| - Parse XML files from ClinicalTrials.gov | |
| - Clean and organize the parsed data | |
| - Flexible to adjust and parse additional information | |
| ## Requirements | |
| - Python 2.x or 3.x | |
| - pandas | |
| - lxml | |
| - tqdm | |
| ## Setup | |
| ### 1. Clone the Repository | |
| ```sh | |
| git clone https://github.com/yourusername/clinicaltrialparsing.git | |
| cd clinicaltrialparsing | |
| ``` | |
| ### 2. Create and Activate a Virtual Environment | |
| #### e.g., Conda: | |
| ```sh | |
| conda create -n test_env python=3.x | |
| conda activate test_env | |
| ``` | |
| ### 3. Install Dependencies | |
| #### e.g., Using pip: | |
| ```sh | |
| pip install pandas lxml tqdm | |
| ``` | |
| ## Usage | |
| ### 1. Download XML Files | |
| Download XML files from ClinicalTrials.gov. | |
| ### 2. Update File Paths | |
| Change the file paths in the xml_parse.py script to point to your downloaded XML files. | |
| ### 3. Execute the script to parse and clean the data: | |
| (Note: change the source and destination paths first!) | |
| ```sh | |
| Copy code | |
| python xml_parse.py | |
| ``` | |
| ## Customization | |
| If you need to parse additional information from the XML files, you can adjust the code in xml_parse.py as required. | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| import sys | |
| import os | |
| import xml.etree.cElementTree as ET | |
| import time | |
| import csv | |
| import itertools | |
| from tqdm import tqdm | |
| class clinicaltrials(): | |
| """ | |
| 1. Set path to get data and save result data | |
| 2. Define main parser--raw xml data to extractable data | |
| 3. save it to result data | |
| """ | |
| def __init__(self): | |
| #set basic path | |
| self.orginal_path = '../clinicaltrials.gov_raw' #root | |
| self.data_path = os.path.join(self.orginal_path, 'AllPublicXML') #AllPublicXML | |
| self.result_path = os.path.join(self.orginal_path, 'clinical_trial.csv.txt') | |
| self.result_file = open(self.result_path, 'w+', encoding = 'utf-8') | |
| self.csv_output = csv.writer(self.result_file, delimiter = '\t') | |
| self.file_path = [] #list of paths to all XML files | |
| #Then, get path of all the files | |
| def get_file_path(self): | |
| for temp in os.listdir(self.data_path)[0:]: | |
| if (temp != 'AllPublicXML.zip') & ~ (temp.endswith('.txt')) & (temp[0] != '.'): #AllPublicXML.zip | |
| temp_path = os.path.join(self.data_path, temp) | |
| for filename in os.listdir(temp_path): | |
| self.file_path.append(os.path.join(temp_path, filename)) | |
| #parse sponsors into lead sponsor and collaborators. Collaborators are dict. | |
| def parse_sponsors(self, base_root, args): | |
| tag = base_root.findall('sponsors') | |
| lead_sponsors = {'agency': '', | |
| 'agency_class': ''} | |
| collaborators = {} | |
| collaborators_no = '' | |
| if len(tag[0]) == 1: | |
| try: | |
| lead_sponsors[tag[0][0][0].tag] = tag[0][0][0].text | |
| except: | |
| pass | |
| try: | |
| lead_sponsors[tag[0][0][1].tag] = tag[0][0][1].text | |
| except: | |
| pass | |
| elif len(tag[0]) > 1: | |
| #collaborators no. | |
| try: | |
| collaborators_no = len(tag[0]) - 1 | |
| except: | |
| pass | |
| #lead sponsor | |
| try: | |
| lead_sponsors[tag[0][0][0].tag] = tag[0][0][0].text | |
| except: | |
| pass | |
| try: | |
| lead_sponsors[tag[0][0][1].tag] = tag[0][0][1].text | |
| except: | |
| pass | |
| #collaborators | |
| try: | |
| for i in range(len(tag[0]) - 1): | |
| if tag[0][i+1][1].text not in collaborators: | |
| collaborators[tag[0][i+1][1].text] = [tag[0][i+1][0].text] | |
| else: | |
| collaborators[tag[0][i+1][1].text].append(tag[0][i+1][0].text) | |
| except: | |
| pass | |
| #none exists, source as lead sponsor | |
| else: | |
| try: | |
| lead_sponsors['agency'] = base_root.find('source').text | |
| except: | |
| pass | |
| if args == 'lead_sponsor': | |
| return lead_sponsors | |
| elif args == 'collaborators': | |
| #collaborators_25 = dict(itertools.islice(collaborators.items(), 50)) sufficient for a single cell (already tested) | |
| return collaborators | |
| else: | |
| return collaborators_no | |
| #parse date: study starts or completes | |
| def parse_date(self, base_root, args): | |
| if args == 'start_date': | |
| try: | |
| start_date = base_root.find('primary_start_date').text | |
| except AttributeError: | |
| try: | |
| start_date = base_root.find('start_date').text | |
| except: | |
| start_date = '' | |
| return start_date | |
| elif args == 'completion_date': | |
| try: | |
| completion_date = base_root.find('primary_completion_date').text | |
| except AttributeError: | |
| try: | |
| completion_date = base_root.find('completion_date').text | |
| except: | |
| completion_date = '' | |
| return completion_date | |
| #parse submission date | |
| def parse_submission(self, base_root, args): | |
| if args == 'first_submission_date': | |
| try: | |
| first_submission_date = base_root.find('study_first_submitted').text | |
| except AttributeError: | |
| try: | |
| first_submission_date = base_root.find('study_first_submitted_qc').text | |
| except AttributeError: | |
| try: | |
| first_submission_date = base_root.find('study_first_posted').text | |
| except: | |
| first_submission_date = '' | |
| return first_submission_date | |
| if args == 'last_submission_date': | |
| try: | |
| last_submission_date = base_root.find('last_update_submitted').text | |
| except AttributeError: | |
| try: | |
| last_submission_date = base_root.find('last_update_submitted_qc').text | |
| except AttributeError: | |
| try: | |
| last_submission_date = base_root.find('last_update_posted type').text | |
| except: | |
| last_submission_date = '' | |
| return last_submission_date | |
| # parse study type and phases | |
| def parse_study(self, base_root, args): | |
| if args == 'study_type': | |
| study_type = base_root.find('study_type').text | |
| return study_type | |
| elif args == 'phases': | |
| try: | |
| phases = base_root.find('phase').text | |
| except: | |
| phases = '' | |
| return phases | |
| #parse status and reason | |
| def parse_status(self, base_root, args): | |
| status = base_root.find('overall_status').text | |
| if 'unknown' in status.lower(): | |
| try: | |
| status_reason = base_root.find('last_known_status').text.replace('\r', '').strip() | |
| except: | |
| status_reason = '' | |
| pass | |
| elif 'terminated' in status.lower(): | |
| try: | |
| status_reason = base_root.find('why_stopped').text.replace('\r', '').strip() | |
| except: | |
| status_reason = '' | |
| pass | |
| else: | |
| status_reason = '' | |
| if args == 'status': | |
| return status | |
| elif args == 'status_reason': | |
| return status_reason | |
| #parse regulation data | |
| def parse_regulation(self, base_root): | |
| #create dict first | |
| regulations = {'has_dmc': '', | |
| 'is_fda_regulated_drug': '', | |
| 'is_fda_regulated_device': ''} | |
| #if data exist, try to append data to dict | |
| try: | |
| for children in base_root.findall('oversight_info'): | |
| for child in children: | |
| regulations[child.tag] = child.text | |
| except: | |
| pass | |
| return regulations | |
| #parse location data | |
| def parse_locations(self, base_root, args): | |
| facilities_no = '' | |
| facilities = {} | |
| countries = {} | |
| facilities_by_countries_no = {} | |
| #number of locations | |
| try: | |
| facilities_no = len(base_root.findall('location')) | |
| except: | |
| pass | |
| #facilities info. | |
| try: | |
| for location in base_root.findall('location'): | |
| name = location.find('facility').find('name').text | |
| country = location.find('facility').find('address').find('country').text | |
| if country not in facilities: | |
| facilities[country] = [name] | |
| else: | |
| facilities[country].append(name) | |
| except: | |
| pass | |
| #countries info. | |
| try: | |
| for j in range(len(base_root.find('location_countries'))): | |
| countries[f'country_{j+1}'] = base_root.find('location_countries')[j].text | |
| except: | |
| pass | |
| #facilities_by_country info | |
| try: | |
| for key, value in facilities.items(): | |
| facilities_by_countries_no[key] = len([item for item in value if item]) | |
| except: | |
| pass | |
| if args == 'facilities_no': | |
| return facilities_no | |
| elif args == 'facilities': | |
| #dictionary limit (N = 30) | |
| #facilities_30 = dict(itertools.islice(facilities.items(), 30)) | |
| return facilities | |
| elif args == 'countries': | |
| return countries | |
| elif args == 'facilities_by_countries_no': | |
| return facilities_by_countries_no | |
| #parse official, responsible party, reference data | |
| def parse_knowledge(self, base_root, args): | |
| if args == 'overall_official': | |
| #create dict first | |
| official = {'last_name': '', | |
| 'role': '', | |
| 'affiliation': ''} | |
| #if data exists, try to append data to dict | |
| try: | |
| for children in base_root.find('overall_official'): | |
| official[children.tag] = children.text | |
| except: | |
| pass | |
| return official | |
| if args == 'enrollment': | |
| #create dict first | |
| enrollment = {'enrollment': '', | |
| 'type': ''} #enrollment: no., type: anticipated or actual | |
| #if data exist, try to append data to dict | |
| try: | |
| children = base_root.find('enrollment') | |
| enrollment[children.tag] = children.text | |
| enrollment.update(children.attrib) | |
| except: | |
| pass | |
| return enrollment | |
| if args == 'eligibility': | |
| #create dict first | |
| eligibility = {'gender': '', | |
| 'minimum_age': '', | |
| 'maximum_age': '', | |
| 'healthy_volunteers': ''} | |
| #if data exist, try to append data to dict | |
| try: | |
| for children in base_root.find('eligibility'): | |
| if children.tag == 'gender': | |
| eligibility[children.tag] = children.text | |
| elif children.tag == 'maximum_age': | |
| eligibility[children.tag] = children.text | |
| elif children.tag == 'minimum_age': | |
| eligibility[children.tag] = children.text | |
| elif children.tag == 'healthy_volunteers': | |
| eligibility[children.tag] = children.text | |
| except: | |
| pass | |
| return eligibility | |
| if args == 'disease': | |
| disease = [] | |
| try: | |
| for children in base_root.findall('condition'): | |
| disease += [children.text] | |
| except: | |
| pass | |
| return disease | |
| if args == 'intervention': | |
| intervention = {} | |
| try: | |
| for children in base_root.findall('intervention'): | |
| if children.find('intervention_type').text not in intervention: | |
| intervention[children.find('intervention_type').text] = [children.find('intervention_name').text] | |
| else: | |
| intervention[children.find('intervention_type').text].append(children.find('intervention_name').text) | |
| except: | |
| pass | |
| return intervention | |
| if args == 'responsible_party': | |
| responsible_party = '' | |
| try: | |
| for children in base_root.findall('responsible_party'): | |
| for child in children: | |
| responsible_party = child.text | |
| except: | |
| pass | |
| return responsible_party | |
| if args == 'references_no': | |
| try: | |
| references_no = len(base_root.findall('reference')) | |
| except: | |
| references_no = '' | |
| return references_no | |
| if args == 'references': | |
| references = {} | |
| references_pmid = {} | |
| try: | |
| for reference, i in zip(base_root.findall('reference'), range(len(base_root.findall('reference')))): | |
| for children in reference: | |
| if children.tag == 'citation': | |
| references[f'reference_{i+1}'] = children.text | |
| elif children.tag == 'PMID': | |
| references_pmid[f'reference_{i+1}_pmid'] = children.text | |
| except: | |
| pass | |
| #return pmid this time (N = 50) | |
| references_pmid_50 = dict(itertools.islice(references_pmid.items(), 50)) | |
| return references_pmid_50 | |
| def parse_xml_file(self): | |
| #ready each xml file | |
| for x in tqdm(range(len(self.file_path))): | |
| #print('start:', self.file_path[x]) | |
| tree = ET.parse(self.file_path[x]) | |
| root = tree.getroot() | |
| #get variables that are necessary | |
| nct_id = root.find('id_info').find('nct_id').text | |
| url_link = root.find('required_header').find('url').text | |
| trial_title = root.find('brief_title').text | |
| lead_sponsor = self.parse_sponsors(root, 'lead_sponsor')['agency'] | |
| lead_sponsor_class = self.parse_sponsors(root, 'lead_sponsor')['agency_class'] | |
| collaborators_no = self.parse_sponsors(root, 'collaborators_no') | |
| collaborators = self.parse_sponsors(root, 'collaborators') | |
| source = root.find('source').text | |
| study_type = self.parse_study(root, 'study_type') | |
| phases = self.parse_study(root, 'phases') | |
| start_date = self.parse_date(root, 'start_date') | |
| completion_date = self.parse_date(root, 'completion_date') | |
| first_submission_date = self.parse_submission(root, 'first_submission_date') | |
| last_submission_date = self.parse_submission(root, 'last_submission_date') | |
| status = self.parse_status(root, 'status') | |
| status_reason = self.parse_status(root, 'status_reason') | |
| facilities_no = self.parse_locations(root, 'facilities_no') | |
| facilities = self.parse_locations(root, 'facilities') | |
| facilities_countries = self.parse_locations(root, 'countries') | |
| facilities_by_countries_no = self.parse_locations(root, 'facilities_by_countries_no') | |
| overall_official_last_name = self.parse_knowledge(root, 'overall_official')['last_name'] | |
| overall_official_role = self.parse_knowledge(root, 'overall_official')['role'] | |
| overall_official_affiliation = self.parse_knowledge(root, 'overall_official')['affiliation'] | |
| enrollment_no = self.parse_knowledge(root, 'enrollment')['enrollment'] | |
| enrollment_type = self.parse_knowledge(root, 'enrollment')['type'] | |
| eligibility_minimum_age = self.parse_knowledge(root, 'eligibility')['minimum_age'] | |
| eligibility_maximum_age = self.parse_knowledge(root, 'eligibility')['maximum_age'] | |
| eligibility_gender = self.parse_knowledge(root, 'eligibility')['gender'] | |
| eligibility_healthy_volunteer = self.parse_knowledge(root, 'eligibility')['healthy_volunteers'] | |
| disease = self.parse_knowledge(root, 'disease') | |
| intervention = self.parse_knowledge(root, 'intervention') | |
| responsible_party = self.parse_knowledge(root, 'responsible_party') | |
| has_dmc = self.parse_regulation(root)['has_dmc'] | |
| is_fda_regulated_drug = self.parse_regulation(root)['is_fda_regulated_drug'] | |
| is_fda_regulated_device = self.parse_regulation(root)['is_fda_regulated_device'] | |
| try: | |
| brief_sum = root.find('brief_summary')[0].text.replace('\r', '').replace('\n', '').replace(' ', '').strip() | |
| except: | |
| brief_sum = '' | |
| try: | |
| detail_desc = root.find('detailed_description')[0].text.replace('\r', '').replace('\n', '').replace(' ', '').strip() | |
| except: | |
| detail_desc = '' | |
| references_no = self.parse_knowledge(root, 'references_no') | |
| references_pmid = self.parse_knowledge(root, 'references') #pmid top 50 this time | |
| #create datachunk and write is as row | |
| datachunk = (nct_id, | |
| url_link, | |
| trial_title, | |
| lead_sponsor, | |
| lead_sponsor_class, | |
| collaborators_no, | |
| collaborators, | |
| source, | |
| study_type, | |
| phases, | |
| start_date, | |
| completion_date, | |
| first_submission_date, | |
| last_submission_date, | |
| status, | |
| status_reason, | |
| facilities_no, | |
| facilities, | |
| facilities_countries, | |
| facilities_by_countries_no, | |
| overall_official_last_name, | |
| overall_official_role, | |
| overall_official_affiliation, | |
| enrollment_no, | |
| enrollment_type, | |
| eligibility_minimum_age, | |
| eligibility_maximum_age, | |
| eligibility_gender, | |
| eligibility_healthy_volunteer, | |
| disease, | |
| intervention, | |
| responsible_party, | |
| has_dmc, | |
| is_fda_regulated_drug, | |
| is_fda_regulated_device, | |
| brief_sum, | |
| detail_desc, | |
| references_no, | |
| references_pmid) | |
| self.csv_output.writerow(datachunk) | |
| time.sleep(0.1) | |
| return | |
| def make_dataframe(self): | |
| # create header | |
| table_header = ('nct_id', | |
| 'url_link', | |
| 'trial_title', | |
| 'lead_sponsor', | |
| 'lead_sponsor_class', | |
| 'collaborators_no', | |
| 'collaborators', | |
| 'source', | |
| 'study_type', | |
| 'phases', | |
| 'start_date', | |
| 'completion_date', | |
| 'first_submission_date', | |
| 'last_submission_date', | |
| 'status', | |
| 'status_reason', | |
| 'facilities_no', | |
| 'facilities', | |
| 'facilities_countries', | |
| 'facilities_by_countries_no', | |
| 'overall_official_last_name', | |
| 'overall_official_role', | |
| 'overall_official_affiliation', | |
| 'enrollment_no', | |
| 'enrollment_type', | |
| 'eligibility_minimum_age', | |
| 'eligibility_maximum_age', | |
| 'eligibility_gender', | |
| 'eligibility_healthy_volunteer', | |
| 'disease', | |
| 'intervention', | |
| 'responsible_party', | |
| 'has_dmc', | |
| 'is_fda_regulated_drug', | |
| 'is_fda_regulated_device', | |
| 'brief_sum', | |
| 'detail_desc', | |
| 'references_no', | |
| 'references_pmid') | |
| self.csv_output.writerow(table_header) | |
| # dump datachunk and save to csv (utf-8) | |
| self.parse_xml_file() | |
| #close when it fin. | |
| self.result_file.close() | |
| def unwrap_function(): | |
| obj = clinicaltrials() | |
| obj.get_file_path() | |
| obj.make_dataframe() | |
| if __name__ == '__main__': | |
| import time | |
| start_time = time.time() | |
| print('start collecting US clinical trial data') | |
| unwrap_function() | |
| print('fin:', time.time() - start_time) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment