Skip to content

Instantly share code, notes, and snippets.

@Jong-Sig
Created June 16, 2024 20:15
Show Gist options
  • Select an option

  • Save Jong-Sig/aa72a49af917a8d5db2adacb60a86c33 to your computer and use it in GitHub Desktop.

Select an option

Save Jong-Sig/aa72a49af917a8d5db2adacb60a86c33 to your computer and use it in GitHub Desktop.
python script to parse clinicial trial data
"""
# Clinical Trial XML Parser
This repository contains a script to parse XML files from ClinicalTrials.gov and clean the resulting dataset.
Note: This code was originally written in 2020 in Python 2.x, so it may be less efficient than more recent versions.
## Features
- Parse XML files from ClinicalTrials.gov
- Clean and organize the parsed data
- Flexible to adjust and parse additional information
## Requirements
- Python 2.x or 3.x
- pandas
- lxml
- tqdm
## Setup
### 1. Clone the Repository
```sh
git clone https://github.com/yourusername/clinicaltrialparsing.git
cd clinicaltrialparsing
```
### 2. Create and Activate a Virtual Environment
#### e.g., Conda:
```sh
conda create -n test_env python=3.x
conda activate test_env
```
### 3. Install Dependencies
#### e.g., Using pip:
```sh
pip install pandas lxml tqdm
```
## Usage
### 1. Download XML Files
Download XML files from ClinicalTrials.gov.
### 2. Update File Paths
Change the file paths in the xml_parse.py script to point to your downloaded XML files.
### 3. Execute the script to parse and clean the data:
(Note: change the source and destination paths first!)
```sh
Copy code
python xml_parse.py
```
## Customization
If you need to parse additional information from the XML files, you can adjust the code in xml_parse.py as required.
"""
import numpy as np
import pandas as pd
import sys
import os
import xml.etree.cElementTree as ET
import time
import csv
import itertools
from tqdm import tqdm
class clinicaltrials():
"""
1. Set path to get data and save result data
2. Define main parser--raw xml data to extractable data
3. save it to result data
"""
def __init__(self):
#set basic path
self.orginal_path = '../clinicaltrials.gov_raw' #root
self.data_path = os.path.join(self.orginal_path, 'AllPublicXML') #AllPublicXML
self.result_path = os.path.join(self.orginal_path, 'clinical_trial.csv.txt')
self.result_file = open(self.result_path, 'w+', encoding = 'utf-8')
self.csv_output = csv.writer(self.result_file, delimiter = '\t')
self.file_path = [] #list of paths to all XML files
#Then, get path of all the files
def get_file_path(self):
for temp in os.listdir(self.data_path)[0:]:
if (temp != 'AllPublicXML.zip') & ~ (temp.endswith('.txt')) & (temp[0] != '.'): #AllPublicXML.zip
temp_path = os.path.join(self.data_path, temp)
for filename in os.listdir(temp_path):
self.file_path.append(os.path.join(temp_path, filename))
#parse sponsors into lead sponsor and collaborators. Collaborators are dict.
def parse_sponsors(self, base_root, args):
tag = base_root.findall('sponsors')
lead_sponsors = {'agency': '',
'agency_class': ''}
collaborators = {}
collaborators_no = ''
if len(tag[0]) == 1:
try:
lead_sponsors[tag[0][0][0].tag] = tag[0][0][0].text
except:
pass
try:
lead_sponsors[tag[0][0][1].tag] = tag[0][0][1].text
except:
pass
elif len(tag[0]) > 1:
#collaborators no.
try:
collaborators_no = len(tag[0]) - 1
except:
pass
#lead sponsor
try:
lead_sponsors[tag[0][0][0].tag] = tag[0][0][0].text
except:
pass
try:
lead_sponsors[tag[0][0][1].tag] = tag[0][0][1].text
except:
pass
#collaborators
try:
for i in range(len(tag[0]) - 1):
if tag[0][i+1][1].text not in collaborators:
collaborators[tag[0][i+1][1].text] = [tag[0][i+1][0].text]
else:
collaborators[tag[0][i+1][1].text].append(tag[0][i+1][0].text)
except:
pass
#none exists, source as lead sponsor
else:
try:
lead_sponsors['agency'] = base_root.find('source').text
except:
pass
if args == 'lead_sponsor':
return lead_sponsors
elif args == 'collaborators':
#collaborators_25 = dict(itertools.islice(collaborators.items(), 50)) sufficient for a single cell (already tested)
return collaborators
else:
return collaborators_no
#parse date: study starts or completes
def parse_date(self, base_root, args):
if args == 'start_date':
try:
start_date = base_root.find('primary_start_date').text
except AttributeError:
try:
start_date = base_root.find('start_date').text
except:
start_date = ''
return start_date
elif args == 'completion_date':
try:
completion_date = base_root.find('primary_completion_date').text
except AttributeError:
try:
completion_date = base_root.find('completion_date').text
except:
completion_date = ''
return completion_date
#parse submission date
def parse_submission(self, base_root, args):
if args == 'first_submission_date':
try:
first_submission_date = base_root.find('study_first_submitted').text
except AttributeError:
try:
first_submission_date = base_root.find('study_first_submitted_qc').text
except AttributeError:
try:
first_submission_date = base_root.find('study_first_posted').text
except:
first_submission_date = ''
return first_submission_date
if args == 'last_submission_date':
try:
last_submission_date = base_root.find('last_update_submitted').text
except AttributeError:
try:
last_submission_date = base_root.find('last_update_submitted_qc').text
except AttributeError:
try:
last_submission_date = base_root.find('last_update_posted type').text
except:
last_submission_date = ''
return last_submission_date
# parse study type and phases
def parse_study(self, base_root, args):
if args == 'study_type':
study_type = base_root.find('study_type').text
return study_type
elif args == 'phases':
try:
phases = base_root.find('phase').text
except:
phases = ''
return phases
#parse status and reason
def parse_status(self, base_root, args):
status = base_root.find('overall_status').text
if 'unknown' in status.lower():
try:
status_reason = base_root.find('last_known_status').text.replace('\r', '').strip()
except:
status_reason = ''
pass
elif 'terminated' in status.lower():
try:
status_reason = base_root.find('why_stopped').text.replace('\r', '').strip()
except:
status_reason = ''
pass
else:
status_reason = ''
if args == 'status':
return status
elif args == 'status_reason':
return status_reason
#parse regulation data
def parse_regulation(self, base_root):
#create dict first
regulations = {'has_dmc': '',
'is_fda_regulated_drug': '',
'is_fda_regulated_device': ''}
#if data exist, try to append data to dict
try:
for children in base_root.findall('oversight_info'):
for child in children:
regulations[child.tag] = child.text
except:
pass
return regulations
#parse location data
def parse_locations(self, base_root, args):
facilities_no = ''
facilities = {}
countries = {}
facilities_by_countries_no = {}
#number of locations
try:
facilities_no = len(base_root.findall('location'))
except:
pass
#facilities info.
try:
for location in base_root.findall('location'):
name = location.find('facility').find('name').text
country = location.find('facility').find('address').find('country').text
if country not in facilities:
facilities[country] = [name]
else:
facilities[country].append(name)
except:
pass
#countries info.
try:
for j in range(len(base_root.find('location_countries'))):
countries[f'country_{j+1}'] = base_root.find('location_countries')[j].text
except:
pass
#facilities_by_country info
try:
for key, value in facilities.items():
facilities_by_countries_no[key] = len([item for item in value if item])
except:
pass
if args == 'facilities_no':
return facilities_no
elif args == 'facilities':
#dictionary limit (N = 30)
#facilities_30 = dict(itertools.islice(facilities.items(), 30))
return facilities
elif args == 'countries':
return countries
elif args == 'facilities_by_countries_no':
return facilities_by_countries_no
#parse official, responsible party, reference data
def parse_knowledge(self, base_root, args):
if args == 'overall_official':
#create dict first
official = {'last_name': '',
'role': '',
'affiliation': ''}
#if data exists, try to append data to dict
try:
for children in base_root.find('overall_official'):
official[children.tag] = children.text
except:
pass
return official
if args == 'enrollment':
#create dict first
enrollment = {'enrollment': '',
'type': ''} #enrollment: no., type: anticipated or actual
#if data exist, try to append data to dict
try:
children = base_root.find('enrollment')
enrollment[children.tag] = children.text
enrollment.update(children.attrib)
except:
pass
return enrollment
if args == 'eligibility':
#create dict first
eligibility = {'gender': '',
'minimum_age': '',
'maximum_age': '',
'healthy_volunteers': ''}
#if data exist, try to append data to dict
try:
for children in base_root.find('eligibility'):
if children.tag == 'gender':
eligibility[children.tag] = children.text
elif children.tag == 'maximum_age':
eligibility[children.tag] = children.text
elif children.tag == 'minimum_age':
eligibility[children.tag] = children.text
elif children.tag == 'healthy_volunteers':
eligibility[children.tag] = children.text
except:
pass
return eligibility
if args == 'disease':
disease = []
try:
for children in base_root.findall('condition'):
disease += [children.text]
except:
pass
return disease
if args == 'intervention':
intervention = {}
try:
for children in base_root.findall('intervention'):
if children.find('intervention_type').text not in intervention:
intervention[children.find('intervention_type').text] = [children.find('intervention_name').text]
else:
intervention[children.find('intervention_type').text].append(children.find('intervention_name').text)
except:
pass
return intervention
if args == 'responsible_party':
responsible_party = ''
try:
for children in base_root.findall('responsible_party'):
for child in children:
responsible_party = child.text
except:
pass
return responsible_party
if args == 'references_no':
try:
references_no = len(base_root.findall('reference'))
except:
references_no = ''
return references_no
if args == 'references':
references = {}
references_pmid = {}
try:
for reference, i in zip(base_root.findall('reference'), range(len(base_root.findall('reference')))):
for children in reference:
if children.tag == 'citation':
references[f'reference_{i+1}'] = children.text
elif children.tag == 'PMID':
references_pmid[f'reference_{i+1}_pmid'] = children.text
except:
pass
#return pmid this time (N = 50)
references_pmid_50 = dict(itertools.islice(references_pmid.items(), 50))
return references_pmid_50
def parse_xml_file(self):
#ready each xml file
for x in tqdm(range(len(self.file_path))):
#print('start:', self.file_path[x])
tree = ET.parse(self.file_path[x])
root = tree.getroot()
#get variables that are necessary
nct_id = root.find('id_info').find('nct_id').text
url_link = root.find('required_header').find('url').text
trial_title = root.find('brief_title').text
lead_sponsor = self.parse_sponsors(root, 'lead_sponsor')['agency']
lead_sponsor_class = self.parse_sponsors(root, 'lead_sponsor')['agency_class']
collaborators_no = self.parse_sponsors(root, 'collaborators_no')
collaborators = self.parse_sponsors(root, 'collaborators')
source = root.find('source').text
study_type = self.parse_study(root, 'study_type')
phases = self.parse_study(root, 'phases')
start_date = self.parse_date(root, 'start_date')
completion_date = self.parse_date(root, 'completion_date')
first_submission_date = self.parse_submission(root, 'first_submission_date')
last_submission_date = self.parse_submission(root, 'last_submission_date')
status = self.parse_status(root, 'status')
status_reason = self.parse_status(root, 'status_reason')
facilities_no = self.parse_locations(root, 'facilities_no')
facilities = self.parse_locations(root, 'facilities')
facilities_countries = self.parse_locations(root, 'countries')
facilities_by_countries_no = self.parse_locations(root, 'facilities_by_countries_no')
overall_official_last_name = self.parse_knowledge(root, 'overall_official')['last_name']
overall_official_role = self.parse_knowledge(root, 'overall_official')['role']
overall_official_affiliation = self.parse_knowledge(root, 'overall_official')['affiliation']
enrollment_no = self.parse_knowledge(root, 'enrollment')['enrollment']
enrollment_type = self.parse_knowledge(root, 'enrollment')['type']
eligibility_minimum_age = self.parse_knowledge(root, 'eligibility')['minimum_age']
eligibility_maximum_age = self.parse_knowledge(root, 'eligibility')['maximum_age']
eligibility_gender = self.parse_knowledge(root, 'eligibility')['gender']
eligibility_healthy_volunteer = self.parse_knowledge(root, 'eligibility')['healthy_volunteers']
disease = self.parse_knowledge(root, 'disease')
intervention = self.parse_knowledge(root, 'intervention')
responsible_party = self.parse_knowledge(root, 'responsible_party')
has_dmc = self.parse_regulation(root)['has_dmc']
is_fda_regulated_drug = self.parse_regulation(root)['is_fda_regulated_drug']
is_fda_regulated_device = self.parse_regulation(root)['is_fda_regulated_device']
try:
brief_sum = root.find('brief_summary')[0].text.replace('\r', '').replace('\n', '').replace(' ', '').strip()
except:
brief_sum = ''
try:
detail_desc = root.find('detailed_description')[0].text.replace('\r', '').replace('\n', '').replace(' ', '').strip()
except:
detail_desc = ''
references_no = self.parse_knowledge(root, 'references_no')
references_pmid = self.parse_knowledge(root, 'references') #pmid top 50 this time
#create datachunk and write is as row
datachunk = (nct_id,
url_link,
trial_title,
lead_sponsor,
lead_sponsor_class,
collaborators_no,
collaborators,
source,
study_type,
phases,
start_date,
completion_date,
first_submission_date,
last_submission_date,
status,
status_reason,
facilities_no,
facilities,
facilities_countries,
facilities_by_countries_no,
overall_official_last_name,
overall_official_role,
overall_official_affiliation,
enrollment_no,
enrollment_type,
eligibility_minimum_age,
eligibility_maximum_age,
eligibility_gender,
eligibility_healthy_volunteer,
disease,
intervention,
responsible_party,
has_dmc,
is_fda_regulated_drug,
is_fda_regulated_device,
brief_sum,
detail_desc,
references_no,
references_pmid)
self.csv_output.writerow(datachunk)
time.sleep(0.1)
return
def make_dataframe(self):
# create header
table_header = ('nct_id',
'url_link',
'trial_title',
'lead_sponsor',
'lead_sponsor_class',
'collaborators_no',
'collaborators',
'source',
'study_type',
'phases',
'start_date',
'completion_date',
'first_submission_date',
'last_submission_date',
'status',
'status_reason',
'facilities_no',
'facilities',
'facilities_countries',
'facilities_by_countries_no',
'overall_official_last_name',
'overall_official_role',
'overall_official_affiliation',
'enrollment_no',
'enrollment_type',
'eligibility_minimum_age',
'eligibility_maximum_age',
'eligibility_gender',
'eligibility_healthy_volunteer',
'disease',
'intervention',
'responsible_party',
'has_dmc',
'is_fda_regulated_drug',
'is_fda_regulated_device',
'brief_sum',
'detail_desc',
'references_no',
'references_pmid')
self.csv_output.writerow(table_header)
# dump datachunk and save to csv (utf-8)
self.parse_xml_file()
#close when it fin.
self.result_file.close()
def unwrap_function():
obj = clinicaltrials()
obj.get_file_path()
obj.make_dataframe()
if __name__ == '__main__':
import time
start_time = time.time()
print('start collecting US clinical trial data')
unwrap_function()
print('fin:', time.time() - start_time)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment