Skip to content

Instantly share code, notes, and snippets.

@d0choa
Last active June 4, 2021 17:50
Show Gist options
  • Save d0choa/d909865e9a12a49d23ab9ae3810b80af to your computer and use it in GitHub Desktop.
Save d0choa/d909865e9a12a49d23ab9ae3810b80af to your computer and use it in GitHub Desktop.
Suggest UKBB trait mappings that could potentially be updated. Dependency: using external mapping tool
from pyspark.sql import SparkSession
from pyspark.sql import Window as W
from pyspark.sql import functions as F
# genetics portal studies
studiesPath = "/Users/ochoa/Datasets/study-index/"
failedPath = "/Users/ochoa/Datasets/failedEvidence"
evidenceFailedPath = "/Users/ochoa/Datasets/evidenceFailed"
diseasePath = "/Users/ochoa/Datasets/diseases"
# establish spark connection
spark = (
SparkSession.builder
.master('local[*]')
.getOrCreate()
)
studies = spark.read.json(studiesPath)
failed = spark.read.parquet(evidenceFailedPath)
disease = spark.read.parquet(diseasePath)
invalidInPlatform = (
failed
.filter((~F.col("resolvedDisease")) &
(F.col("datasourceId") == "ot_genetics_portal"))
.select(F.col("diseaseFromSource").alias("traitName"))
.withColumn("invalidInPlatform", F.lit(True))
.distinct()
)
# mk = (
# spark.read.json("/Users/ochoa/Downloads/studiesToMapAutoMapped.json")
# .persist()
# )
mk = (
spark.read.json("/Users/ochoa/Downloads/traits_mapped_epmc_model.json")
.persist()
)
# Studies in genetics portal
out = (
studies
.select("study_id",
F.col("trait_reported").alias("traitName"),
F.explode("trait_efos").alias("currentEfo"))
# studies that are not from gwas catalog
.filter(~F.col("study_id").rlike("^GCST"))
# add studies that have been invalid in platform
.join(invalidInPlatform, on="traitName", how="left")
# add Miguel mappings
.join(mk.select("traitName", "efoId", "efoName", "score").distinct(),
on="traitName",
how="left")
# current disease labels
.join(disease.select(F.col("id").alias("currentEfo"),
F.col("name").alias("currentName")),
on="currentEfo",
how="left")
# boolean for cases in which there is at least one confirmed
# mapping with mk
.withColumn("validatedMapping",
F.when(F.col("currentEfo") == F.col("efoId"), F.lit(True))
.otherwise(False))
.withColumn("validatedTrait",
F.sum(F.when(F.col("validatedMapping"), F.lit(1)))
.over(W.partitionBy('study_id')))
.withColumn("validatedTrait",
F.when(F.col("validatedTrait").isNotNull(), F.lit(True))
.otherwise(F.lit(False)))
)
(
out
.filter(~F.col("validatedTrait"))
.filter(F.col("efoId").isNotNull())
.filter(F.col("score") > 0.8)
.filter(~F.col("currentEfo").rlike("^CHEBI"))
.sort(F.col("score").desc())
.select("study_id", "traitName", "currentEfo", "currentName",
F.col("efoId").alias("candidateId"),
F.col("efoName").alias("candidateName"),
"score")
# .show(truncate = False)
.coalesce(1)
.write
.format('com.databricks.spark.csv')
.save('/Users/ochoa/Datasets/ukbb_suggested_updates.csv',
header='true')
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment