Skip to content

Instantly share code, notes, and snippets.

@fdefalco
Last active December 13, 2022 17:22
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save fdefalco/35b7844626a9d8808d99bf5990e46ed2 to your computer and use it in GitHub Desktop.
Save fdefalco/35b7844626a9d8808d99bf5990e46ed2 to your computer and use it in GitHub Desktop.
# TODO
# Potentially create a version with just Sqlite
# create_source_to_standard_vocab_map.sql - need to improve performance
# include additional measurement units in the vocabulary shard
# abstract creation of concepts.csv to create synthea concepts from concept extraction jar at execution
# add scan report to loop (whiterabbit)
options(connectionObserver = NULL)
options(useFancyQuotes = FALSE)
# **************************************
# INSTRUCTIONS FOR USE
# **************************************
# The first step is to create a separate environment.R file that you can run to set all the required environment variables
# Required environment variables are defined in the next section.
# Once these are set you can run the remaining script to generate your own synthetic data network
# Running on different days (or playing with the date key manually) will generate updates to your data network
# **************************************
# **************************************
# ENVIRONMENT VARIABLES
# **************************************
# used for creating vocabulary shard
syntheaConceptsFile <- Sys.getenv("omop_vocabulary_concept_file_location") #"D:/OHDSI/synthea/output/concepts.csv"
aresDataRoot <- Sys.getenv("ares_data_root") # "D:/OHDSI/Ares/public/data"
dbms <- Sys.getenv("dbms") # "postgresql"
server <- Sys.getenv("server") # "localhost/ohdsi"
user <- Sys.getenv("user") # "postgres"
password <- Sys.getenv("password") # "password"
pathToDriver <- Sys.getenv("path_to_driver")
syntheaExecutable <- Sys.getenv("synthea_executable") # D:/OHDSI/synthea releases/2.7/synthea-with-dependencies.jar
syntheaOutputDirectory <- Sys.getenv("synthea_output_directory") # "D:/OHDSI/SyntheaData/"
vocabDatabaseSchema <- Sys.getenv("vocab_database_schema")
# generate limited concept listing from synthea
# seems broken, generating instead using github distribution annd ./gradlew concepts
# syntheaExtractConceptsCommand <- paste0("java -jar \"", syntheaExecutable, "\" concepts")
# system(syntheaExtractConceptsCommand)
cdmVersion <- "5.4"
syntheaVersion <- "2.7.0"
createVocabularyShard <- function(connectionDetails, syntheaConceptsFile, cdmDatabaseSchema, vocabDatabaseSchema) {
connection <- DatabaseConnector::connect(connectionDetails)
# load codes from synthea
syntheaConcepts <- read.csv(syntheaConceptsFile, header = F)
names(syntheaConcepts) <-
c("vocabulary",
"concept_code",
"source_concept_name",
"source_module_maybe")
snomedConcepts <-
syntheaConcepts[syntheaConcepts$vocabulary == 'SNOMED-CT', ]
snomedConceptCodes <- sQuote(snomedConcepts$concept_code)
snomedSql <- SqlRender::render(
"insert into @cdmDatabaseSchema.concept select * from @vocabDatabaseSchema.concept where vocabulary_id = 'SNOMED' and concept_code in (@conceptCodes)",
conceptCodes = snomedConceptCodes,
cdmDatabaseSchema = cdmDatabaseSchema,
vocabDatabaseSchema = vocabDatabaseSchema
)
DatabaseConnector::executeSql(connection, snomedSql)
# include visits
visitSql <- SqlRender::render(
"insert into @cdmDatabaseSchema.concept select * from @vocabDatabaseSchema.concept where concept_id in (9201,9202,9203)",
cdmDatabaseSchema = cdmDatabaseSchema,
vocabDatabaseSchema = vocabDatabaseSchema
)
DatabaseConnector::executeSql(connection, visitSql)
# include measurement units in vocabulary
ucumSql <- SqlRender::render(
"insert into @cdmDatabaseSchema.concept select * from @vocabDatabaseSchema.concept where vocabulary_id = 'UCUM'",
cdmDatabaseSchema = cdmDatabaseSchema,
vocabDatabaseSchema = vocabDatabaseSchema
)
DatabaseConnector::executeSql(connection, ucumSql)
ethnicitySql <- SqlRender::render(
"insert into @cdmDatabaseSchema.concept select * from @vocabDatabaseSchema.concept where vocabulary_id = 'Ethnicity'",
cdmDatabaseSchema = cdmDatabaseSchema,
vocabDatabaseSchema = vocabDatabaseSchema
)
DatabaseConnector::executeSql(connection, ethnicitySql)
loincConcepts <-
syntheaConcepts[syntheaConcepts$vocabulary == 'LOINC', ]
loincConceptCodes <- sQuote(loincConcepts$concept_code)
loincSql <- SqlRender::render(
"insert into @cdmDatabaseSchema.concept select * from @vocabDatabaseSchema.concept where vocabulary_id = 'LOINC' and concept_code in (@conceptCodes)",
conceptCodes = loincConceptCodes,
cdmDatabaseSchema = cdmDatabaseSchema,
vocabDatabaseSchema = vocabDatabaseSchema
)
DatabaseConnector::executeSql(connection, loincSql)
rxnormConcepts <-
syntheaConcepts[syntheaConcepts$vocabulary == 'RxNorm', ]
rxnormConceptCodes <- sQuote(rxnormConcepts$concept_code)
rxnormSql <- SqlRender::render(
"insert into @cdmDatabaseSchema.concept select * from @vocabDatabaseSchema.concept where vocabulary_id = 'RxNorm' and concept_code in (@conceptCodes)",
conceptCodes = rxnormConceptCodes,
cdmDatabaseSchema = cdmDatabaseSchema,
vocabDatabaseSchema = vocabDatabaseSchema
)
DatabaseConnector::executeSql(connection, rxnormSql)
# if vocabulary includes cvx
# cvxSql <- SqlRender::render(
# "select * from cdm.concept where vocabulary_id = 'CVX' and concept_code in (@conceptCodes)",
# conceptCodes = cvxConceptCodes
# )
# cvxConcepts <- syntheaConcepts[syntheaConcepts$vocabulary=='http://hl7.org/fhir/sid/cvx',]
# cvxConceptCodes <- sQuote(cvxConcepts$concept_code)
sqlConceptRelationship <- SqlRender::render(
sql = "insert into @cdmDatabaseSchema.concept_relationship
select distinct *
from @vocabDatabaseSchema.concept_relationship
where concept_id_1 IN (select concept_id from @cdmDatabaseSchema.concept)
or concept_id_2 IN (select concept_id from @cdmDatabaseSchema.concept)",
cdmDatabaseSchema = cdmDatabaseSchema,
vocabDatabaseSchema = vocabDatabaseSchema
)
DatabaseConnector::executeSql(connection, sqlConceptRelationship)
sqlVocabulary <- SqlRender::render(
sql = "insert into @cdmDatabaseSchema.vocabulary
select 'None' vocabulary_id,
concat('Vocabulary Shard derived from ', vocabulary_version) vocabulary_name,
'Vocabulary Shard' vocabulary_reference,
concat(vocabulary_version,'*') vocabulary_version,
0 vocabulary_concept_id
from @vocabDatabaseSchema.vocabulary
where vocabulary_id = 'None'",
vocabDatabaseSchema = vocabDatabaseSchema,
cdmDatabaseSchema = cdmDatabaseSchema
)
DatabaseConnector::executeSql(connection, sqlVocabulary)
DatabaseConnector::disconnect(connection)
}
# setup run parameters
testKey <- format(Sys.time(), "%Y%m%d_%H%M")
# end configuration
# begin data operations
# https://gist.github.com/fdefalco/35b7844626a9d8808d99bf5990e46ed2
# configure connection
connectionDetails <- DatabaseConnector::createConnectionDetails(
dbms = dbms,
server = server,
user = user,
password = password,
pathToDriver = pathToDriver
)
populationSize <- 100
simulatedSources <- list(
list(abbreviation="NJ",sourceName="NJ", geographySpecification="New Jersey", population=populationSize),
list(abbreviation="PA",sourceName="PA", geographySpecification="Pennsylvania", population=populationSize),
list(abbreviation="NY",sourceName="NY", geographySpecification="New York", population=populationSize),
list(abbreviation="DE",sourceName="DE", geographySpecification="Delaware", population=populationSize),
list(abbreviation="MA",sourceName="MA", geographySpecification="Massachusetts", population=populationSize)
)
for (simulatedSource in simulatedSources) {
writeLines(paste("Processing",simulatedSource$geographySpecification))
# running synthea
syntheaSimulateCommand <- paste0("java -jar \"", syntheaExecutable, "\" ",
"-p ", simulatedSource$population, " ",
"--exporter.fhir.transaction_bundle false ",
"--exporter.csv.export true ",
"--exporter.practitioner.fhir.export false ",
"--exporter.hospital.fhir.export false ",
"--exporter.fhir.export false ",
"--exporter.years_of_history 20 ",
"--exporter.baseDirectory ", syntheaOutputDirectory, simulatedSource$abbreviation, " ",
"\"", simulatedSource$geographySpecification, "\"")
system(syntheaSimulateCommand)
# setup simulated source details
cdmSourceName <- paste0("cdm_",tolower(simulatedSource$abbreviation),"_")
nativeSourceName <- paste0("native_", tolower(simulatedSource$abbreviation),"_")
cdmDatabaseSchema <- paste0(cdmSourceName, testKey)
resultsDatabaseSchema <- paste0(cdmSourceName, testKey)
nativeSchema <- paste0(nativeSourceName, testKey)
syntheaFileLoc <- paste0(syntheaOutputDirectory, simulatedSource$abbreviation, "/csv")
# create schemas
connection <- DatabaseConnector::connect(connectionDetails)
DatabaseConnector::executeSql(connection, paste("drop schema if exists ", cdmDatabaseSchema, "cascade"))
DatabaseConnector::executeSql(connection, paste("drop schema if exists ", nativeSchema, "cascade"))
DatabaseConnector::executeSql(connection, paste("create schema ", cdmDatabaseSchema))
DatabaseConnector::executeSql(connection, paste("create schema ", nativeSchema))
DatabaseConnector::disconnect(connection)
# use the CommonDataModel package to create the CDM tables
ETLSyntheaBuilder::CreateCDMTables(
connectionDetails = connectionDetails,
cdmSchema = cdmDatabaseSchema,
cdmVersion = cdmVersion
)
# create & load the simulated synthea data - this is our native data
ETLSyntheaBuilder::CreateSyntheaTables(
connectionDetails = connectionDetails,
syntheaSchema = nativeSchema,
syntheaVersion = syntheaVersion
)
ETLSyntheaBuilder::LoadSyntheaTables(
connectionDetails = connectionDetails,
syntheaSchema = nativeSchema,
syntheaFileLoc = syntheaFileLoc
)
# install limited vocabulary
createVocabularyShard(connectionDetails, syntheaConceptsFile, cdmDatabaseSchema, vocabDatabaseSchema)
# run our ETL to create the CDM
ETLSyntheaBuilder::LoadEventTables(
connectionDetails = connectionDetails,
cdmSchema = cdmDatabaseSchema,
syntheaSchema = nativeSchema,
cdmVersion = cdmVersion,
cdmSourceName = simulatedSource$sourceName,
cdmSourceAbbreviation = simulatedSource$abbreviation
)
releaseKey <- AresIndexer::getSourceReleaseKey(connectionDetails, cdmDatabaseSchema)
datasourceReleaseOutputFolder <- file.path(aresDataRoot, releaseKey)
# run achilles
Achilles::achilles(
cdmVersion = cdmVersion,
connectionDetails = connectionDetails,
cdmDatabaseSchema = cdmDatabaseSchema,
resultsDatabaseSchema = cdmDatabaseSchema,
smallCellCount = 0
)
# run data quality dashboard
dqResults <- DataQualityDashboard::executeDqChecks(
connectionDetails = connectionDetails,
cdmDatabaseSchema = cdmDatabaseSchema,
resultsDatabaseSchema = resultsDatabaseSchema,
vocabDatabaseSchema = cdmDatabaseSchema,
cdmVersion = cdmVersion,
cdmSourceName = cdmSourceName,
outputFile = "dq-result.json",
outputFolder = datasourceReleaseOutputFolder
)
# export the results
Achilles::exportAO(
connectionDetails = connectionDetails,
cdmDatabaseSchema = cdmDatabaseSchema,
resultsDatabaseSchema = resultsDatabaseSchema,
vocabDatabaseSchema = vocabDatabaseSchema,
outputPath = aresDataRoot
)
# perform temporal characterization
outputFile <- file.path(datasourceReleaseOutputFolder, "temporal-characterization.csv")
Achilles::getTemporalData(
connectionDetails = connectionDetails,
cdmDatabaseSchema = cdmDatabaseSchema,
resultsDatabaseSchema = resultsDatabaseSchema,
)
Achilles::performTemporalCharacterization(
connectionDetails = connectionDetails,
cdmDatabaseSchema = cdmDatabaseSchema,
resultsDatabaseSchema = resultsDatabaseSchema,
outputFile = outputFile
)
# augment concept files with temporal characterization data
AresIndexer::augmentConceptFiles(releaseFolder = file.path(aresDataRoot, releaseKey))
}
# build a network level index for all simulated sources
sourceFolders <- list.dirs(aresDataRoot,recursive=F)
AresIndexer::buildNetworkIndex(sourceFolders = sourceFolders, outputFolder = aresDataRoot)
AresIndexer::buildDataQualityIndex(sourceFolders = sourceFolders, outputFolder = aresDataRoot)
AresIndexer::buildNetworkUnmappedSourceCodeIndex(sourceFolders = sourceFolders, outputFolder = aresDataRoot)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment