Last active
December 13, 2022 17:22
-
-
Save fdefalco/35b7844626a9d8808d99bf5990e46ed2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# TODO | |
# Potentially create a version with just Sqlite | |
# create_source_to_standard_vocab_map.sql - need to improve performance | |
# include additional measurement units in the vocabulary shard | |
# abstract creation of concepts.csv to create synthea concepts from concept extraction jar at execution | |
# add scan report to loop (whiterabbit) | |
options(connectionObserver = NULL) | |
options(useFancyQuotes = FALSE) | |
# ************************************** | |
# INSTRUCTIONS FOR USE | |
# ************************************** | |
# The first step is to create a separate environment.R file that you can run to set all the required environment variables | |
# Required environment variables are defined in the next section. | |
# Once these are set you can run the remaining script to generate your own synthetic data network | |
# Running on different days (or playing with the date key manually) will generate updates to your data network | |
# ************************************** | |
# ************************************** | |
# ENVIRONMENT VARIABLES | |
# ************************************** | |
# used for creating vocabulary shard | |
syntheaConceptsFile <- Sys.getenv("omop_vocabulary_concept_file_location") #"D:/OHDSI/synthea/output/concepts.csv" | |
aresDataRoot <- Sys.getenv("ares_data_root") # "D:/OHDSI/Ares/public/data" | |
dbms <- Sys.getenv("dbms") # "postgresql" | |
server <- Sys.getenv("server") # "localhost/ohdsi" | |
user <- Sys.getenv("user") # "postgres" | |
password <- Sys.getenv("password") # "password" | |
pathToDriver <- Sys.getenv("path_to_driver") | |
syntheaExecutable <- Sys.getenv("synthea_executable") # D:/OHDSI/synthea releases/2.7/synthea-with-dependencies.jar | |
syntheaOutputDirectory <- Sys.getenv("synthea_output_directory") # "D:/OHDSI/SyntheaData/" | |
vocabDatabaseSchema <- Sys.getenv("vocab_database_schema") | |
# generate limited concept listing from synthea | |
# seems broken, generating instead using github distribution annd ./gradlew concepts | |
# syntheaExtractConceptsCommand <- paste0("java -jar \"", syntheaExecutable, "\" concepts") | |
# system(syntheaExtractConceptsCommand) | |
cdmVersion <- "5.4" | |
syntheaVersion <- "2.7.0" | |
createVocabularyShard <- function(connectionDetails, syntheaConceptsFile, cdmDatabaseSchema, vocabDatabaseSchema) { | |
connection <- DatabaseConnector::connect(connectionDetails) | |
# load codes from synthea | |
syntheaConcepts <- read.csv(syntheaConceptsFile, header = F) | |
names(syntheaConcepts) <- | |
c("vocabulary", | |
"concept_code", | |
"source_concept_name", | |
"source_module_maybe") | |
snomedConcepts <- | |
syntheaConcepts[syntheaConcepts$vocabulary == 'SNOMED-CT', ] | |
snomedConceptCodes <- sQuote(snomedConcepts$concept_code) | |
snomedSql <- SqlRender::render( | |
"insert into @cdmDatabaseSchema.concept select * from @vocabDatabaseSchema.concept where vocabulary_id = 'SNOMED' and concept_code in (@conceptCodes)", | |
conceptCodes = snomedConceptCodes, | |
cdmDatabaseSchema = cdmDatabaseSchema, | |
vocabDatabaseSchema = vocabDatabaseSchema | |
) | |
DatabaseConnector::executeSql(connection, snomedSql) | |
# include visits | |
visitSql <- SqlRender::render( | |
"insert into @cdmDatabaseSchema.concept select * from @vocabDatabaseSchema.concept where concept_id in (9201,9202,9203)", | |
cdmDatabaseSchema = cdmDatabaseSchema, | |
vocabDatabaseSchema = vocabDatabaseSchema | |
) | |
DatabaseConnector::executeSql(connection, visitSql) | |
# include measurement units in vocabulary | |
ucumSql <- SqlRender::render( | |
"insert into @cdmDatabaseSchema.concept select * from @vocabDatabaseSchema.concept where vocabulary_id = 'UCUM'", | |
cdmDatabaseSchema = cdmDatabaseSchema, | |
vocabDatabaseSchema = vocabDatabaseSchema | |
) | |
DatabaseConnector::executeSql(connection, ucumSql) | |
ethnicitySql <- SqlRender::render( | |
"insert into @cdmDatabaseSchema.concept select * from @vocabDatabaseSchema.concept where vocabulary_id = 'Ethnicity'", | |
cdmDatabaseSchema = cdmDatabaseSchema, | |
vocabDatabaseSchema = vocabDatabaseSchema | |
) | |
DatabaseConnector::executeSql(connection, ethnicitySql) | |
loincConcepts <- | |
syntheaConcepts[syntheaConcepts$vocabulary == 'LOINC', ] | |
loincConceptCodes <- sQuote(loincConcepts$concept_code) | |
loincSql <- SqlRender::render( | |
"insert into @cdmDatabaseSchema.concept select * from @vocabDatabaseSchema.concept where vocabulary_id = 'LOINC' and concept_code in (@conceptCodes)", | |
conceptCodes = loincConceptCodes, | |
cdmDatabaseSchema = cdmDatabaseSchema, | |
vocabDatabaseSchema = vocabDatabaseSchema | |
) | |
DatabaseConnector::executeSql(connection, loincSql) | |
rxnormConcepts <- | |
syntheaConcepts[syntheaConcepts$vocabulary == 'RxNorm', ] | |
rxnormConceptCodes <- sQuote(rxnormConcepts$concept_code) | |
rxnormSql <- SqlRender::render( | |
"insert into @cdmDatabaseSchema.concept select * from @vocabDatabaseSchema.concept where vocabulary_id = 'RxNorm' and concept_code in (@conceptCodes)", | |
conceptCodes = rxnormConceptCodes, | |
cdmDatabaseSchema = cdmDatabaseSchema, | |
vocabDatabaseSchema = vocabDatabaseSchema | |
) | |
DatabaseConnector::executeSql(connection, rxnormSql) | |
# if vocabulary includes cvx | |
# cvxSql <- SqlRender::render( | |
# "select * from cdm.concept where vocabulary_id = 'CVX' and concept_code in (@conceptCodes)", | |
# conceptCodes = cvxConceptCodes | |
# ) | |
# cvxConcepts <- syntheaConcepts[syntheaConcepts$vocabulary=='http://hl7.org/fhir/sid/cvx',] | |
# cvxConceptCodes <- sQuote(cvxConcepts$concept_code) | |
sqlConceptRelationship <- SqlRender::render( | |
sql = "insert into @cdmDatabaseSchema.concept_relationship | |
select distinct * | |
from @vocabDatabaseSchema.concept_relationship | |
where concept_id_1 IN (select concept_id from @cdmDatabaseSchema.concept) | |
or concept_id_2 IN (select concept_id from @cdmDatabaseSchema.concept)", | |
cdmDatabaseSchema = cdmDatabaseSchema, | |
vocabDatabaseSchema = vocabDatabaseSchema | |
) | |
DatabaseConnector::executeSql(connection, sqlConceptRelationship) | |
sqlVocabulary <- SqlRender::render( | |
sql = "insert into @cdmDatabaseSchema.vocabulary | |
select 'None' vocabulary_id, | |
concat('Vocabulary Shard derived from ', vocabulary_version) vocabulary_name, | |
'Vocabulary Shard' vocabulary_reference, | |
concat(vocabulary_version,'*') vocabulary_version, | |
0 vocabulary_concept_id | |
from @vocabDatabaseSchema.vocabulary | |
where vocabulary_id = 'None'", | |
vocabDatabaseSchema = vocabDatabaseSchema, | |
cdmDatabaseSchema = cdmDatabaseSchema | |
) | |
DatabaseConnector::executeSql(connection, sqlVocabulary) | |
DatabaseConnector::disconnect(connection) | |
} | |
# setup run parameters | |
testKey <- format(Sys.time(), "%Y%m%d_%H%M") | |
# end configuration | |
# begin data operations | |
# https://gist.github.com/fdefalco/35b7844626a9d8808d99bf5990e46ed2 | |
# configure connection | |
connectionDetails <- DatabaseConnector::createConnectionDetails( | |
dbms = dbms, | |
server = server, | |
user = user, | |
password = password, | |
pathToDriver = pathToDriver | |
) | |
populationSize <- 100 | |
simulatedSources <- list( | |
list(abbreviation="NJ",sourceName="NJ", geographySpecification="New Jersey", population=populationSize), | |
list(abbreviation="PA",sourceName="PA", geographySpecification="Pennsylvania", population=populationSize), | |
list(abbreviation="NY",sourceName="NY", geographySpecification="New York", population=populationSize), | |
list(abbreviation="DE",sourceName="DE", geographySpecification="Delaware", population=populationSize), | |
list(abbreviation="MA",sourceName="MA", geographySpecification="Massachusetts", population=populationSize) | |
) | |
for (simulatedSource in simulatedSources) { | |
writeLines(paste("Processing",simulatedSource$geographySpecification)) | |
# running synthea | |
syntheaSimulateCommand <- paste0("java -jar \"", syntheaExecutable, "\" ", | |
"-p ", simulatedSource$population, " ", | |
"--exporter.fhir.transaction_bundle false ", | |
"--exporter.csv.export true ", | |
"--exporter.practitioner.fhir.export false ", | |
"--exporter.hospital.fhir.export false ", | |
"--exporter.fhir.export false ", | |
"--exporter.years_of_history 20 ", | |
"--exporter.baseDirectory ", syntheaOutputDirectory, simulatedSource$abbreviation, " ", | |
"\"", simulatedSource$geographySpecification, "\"") | |
system(syntheaSimulateCommand) | |
# setup simulated source details | |
cdmSourceName <- paste0("cdm_",tolower(simulatedSource$abbreviation),"_") | |
nativeSourceName <- paste0("native_", tolower(simulatedSource$abbreviation),"_") | |
cdmDatabaseSchema <- paste0(cdmSourceName, testKey) | |
resultsDatabaseSchema <- paste0(cdmSourceName, testKey) | |
nativeSchema <- paste0(nativeSourceName, testKey) | |
syntheaFileLoc <- paste0(syntheaOutputDirectory, simulatedSource$abbreviation, "/csv") | |
# create schemas | |
connection <- DatabaseConnector::connect(connectionDetails) | |
DatabaseConnector::executeSql(connection, paste("drop schema if exists ", cdmDatabaseSchema, "cascade")) | |
DatabaseConnector::executeSql(connection, paste("drop schema if exists ", nativeSchema, "cascade")) | |
DatabaseConnector::executeSql(connection, paste("create schema ", cdmDatabaseSchema)) | |
DatabaseConnector::executeSql(connection, paste("create schema ", nativeSchema)) | |
DatabaseConnector::disconnect(connection) | |
# use the CommonDataModel package to create the CDM tables | |
ETLSyntheaBuilder::CreateCDMTables( | |
connectionDetails = connectionDetails, | |
cdmSchema = cdmDatabaseSchema, | |
cdmVersion = cdmVersion | |
) | |
# create & load the simulated synthea data - this is our native data | |
ETLSyntheaBuilder::CreateSyntheaTables( | |
connectionDetails = connectionDetails, | |
syntheaSchema = nativeSchema, | |
syntheaVersion = syntheaVersion | |
) | |
ETLSyntheaBuilder::LoadSyntheaTables( | |
connectionDetails = connectionDetails, | |
syntheaSchema = nativeSchema, | |
syntheaFileLoc = syntheaFileLoc | |
) | |
# install limited vocabulary | |
createVocabularyShard(connectionDetails, syntheaConceptsFile, cdmDatabaseSchema, vocabDatabaseSchema) | |
# run our ETL to create the CDM | |
ETLSyntheaBuilder::LoadEventTables( | |
connectionDetails = connectionDetails, | |
cdmSchema = cdmDatabaseSchema, | |
syntheaSchema = nativeSchema, | |
cdmVersion = cdmVersion, | |
cdmSourceName = simulatedSource$sourceName, | |
cdmSourceAbbreviation = simulatedSource$abbreviation | |
) | |
releaseKey <- AresIndexer::getSourceReleaseKey(connectionDetails, cdmDatabaseSchema) | |
datasourceReleaseOutputFolder <- file.path(aresDataRoot, releaseKey) | |
# run achilles | |
Achilles::achilles( | |
cdmVersion = cdmVersion, | |
connectionDetails = connectionDetails, | |
cdmDatabaseSchema = cdmDatabaseSchema, | |
resultsDatabaseSchema = cdmDatabaseSchema, | |
smallCellCount = 0 | |
) | |
# run data quality dashboard | |
dqResults <- DataQualityDashboard::executeDqChecks( | |
connectionDetails = connectionDetails, | |
cdmDatabaseSchema = cdmDatabaseSchema, | |
resultsDatabaseSchema = resultsDatabaseSchema, | |
vocabDatabaseSchema = cdmDatabaseSchema, | |
cdmVersion = cdmVersion, | |
cdmSourceName = cdmSourceName, | |
outputFile = "dq-result.json", | |
outputFolder = datasourceReleaseOutputFolder | |
) | |
# export the results | |
Achilles::exportAO( | |
connectionDetails = connectionDetails, | |
cdmDatabaseSchema = cdmDatabaseSchema, | |
resultsDatabaseSchema = resultsDatabaseSchema, | |
vocabDatabaseSchema = vocabDatabaseSchema, | |
outputPath = aresDataRoot | |
) | |
# perform temporal characterization | |
outputFile <- file.path(datasourceReleaseOutputFolder, "temporal-characterization.csv") | |
Achilles::getTemporalData( | |
connectionDetails = connectionDetails, | |
cdmDatabaseSchema = cdmDatabaseSchema, | |
resultsDatabaseSchema = resultsDatabaseSchema, | |
) | |
Achilles::performTemporalCharacterization( | |
connectionDetails = connectionDetails, | |
cdmDatabaseSchema = cdmDatabaseSchema, | |
resultsDatabaseSchema = resultsDatabaseSchema, | |
outputFile = outputFile | |
) | |
# augment concept files with temporal characterization data | |
AresIndexer::augmentConceptFiles(releaseFolder = file.path(aresDataRoot, releaseKey)) | |
} | |
# build a network level index for all simulated sources | |
sourceFolders <- list.dirs(aresDataRoot,recursive=F) | |
AresIndexer::buildNetworkIndex(sourceFolders = sourceFolders, outputFolder = aresDataRoot) | |
AresIndexer::buildDataQualityIndex(sourceFolders = sourceFolders, outputFolder = aresDataRoot) | |
AresIndexer::buildNetworkUnmappedSourceCodeIndex(sourceFolders = sourceFolders, outputFolder = aresDataRoot) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment