Last active
June 4, 2021 17:50
-
-
Save d0choa/d909865e9a12a49d23ab9ae3810b80af to your computer and use it in GitHub Desktop.
Suggest UKBB trait mappings that could potentially be updated. Dependency: using external mapping tool
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession | |
from pyspark.sql import Window as W | |
from pyspark.sql import functions as F | |
# genetics portal studies | |
studiesPath = "/Users/ochoa/Datasets/study-index/" | |
failedPath = "/Users/ochoa/Datasets/failedEvidence" | |
evidenceFailedPath = "/Users/ochoa/Datasets/evidenceFailed" | |
diseasePath = "/Users/ochoa/Datasets/diseases" | |
# establish spark connection | |
spark = ( | |
SparkSession.builder | |
.master('local[*]') | |
.getOrCreate() | |
) | |
studies = spark.read.json(studiesPath) | |
failed = spark.read.parquet(evidenceFailedPath) | |
disease = spark.read.parquet(diseasePath) | |
invalidInPlatform = ( | |
failed | |
.filter((~F.col("resolvedDisease")) & | |
(F.col("datasourceId") == "ot_genetics_portal")) | |
.select(F.col("diseaseFromSource").alias("traitName")) | |
.withColumn("invalidInPlatform", F.lit(True)) | |
.distinct() | |
) | |
# mk = ( | |
# spark.read.json("/Users/ochoa/Downloads/studiesToMapAutoMapped.json") | |
# .persist() | |
# ) | |
mk = ( | |
spark.read.json("/Users/ochoa/Downloads/traits_mapped_epmc_model.json") | |
.persist() | |
) | |
# Studies in genetics portal | |
out = ( | |
studies | |
.select("study_id", | |
F.col("trait_reported").alias("traitName"), | |
F.explode("trait_efos").alias("currentEfo")) | |
# studies that are not from gwas catalog | |
.filter(~F.col("study_id").rlike("^GCST")) | |
# add studies that have been invalid in platform | |
.join(invalidInPlatform, on="traitName", how="left") | |
# add Miguel mappings | |
.join(mk.select("traitName", "efoId", "efoName", "score").distinct(), | |
on="traitName", | |
how="left") | |
# current disease labels | |
.join(disease.select(F.col("id").alias("currentEfo"), | |
F.col("name").alias("currentName")), | |
on="currentEfo", | |
how="left") | |
# boolean for cases in which there is at least one confirmed | |
# mapping with mk | |
.withColumn("validatedMapping", | |
F.when(F.col("currentEfo") == F.col("efoId"), F.lit(True)) | |
.otherwise(False)) | |
.withColumn("validatedTrait", | |
F.sum(F.when(F.col("validatedMapping"), F.lit(1))) | |
.over(W.partitionBy('study_id'))) | |
.withColumn("validatedTrait", | |
F.when(F.col("validatedTrait").isNotNull(), F.lit(True)) | |
.otherwise(F.lit(False))) | |
) | |
( | |
out | |
.filter(~F.col("validatedTrait")) | |
.filter(F.col("efoId").isNotNull()) | |
.filter(F.col("score") > 0.8) | |
.filter(~F.col("currentEfo").rlike("^CHEBI")) | |
.sort(F.col("score").desc()) | |
.select("study_id", "traitName", "currentEfo", "currentName", | |
F.col("efoId").alias("candidateId"), | |
F.col("efoName").alias("candidateName"), | |
"score") | |
# .show(truncate = False) | |
.coalesce(1) | |
.write | |
.format('com.databricks.spark.csv') | |
.save('/Users/ochoa/Datasets/ukbb_suggested_updates.csv', | |
header='true') | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment