Skip to content

Instantly share code, notes, and snippets.

@ireneisdoomed
Last active November 22, 2023 14:31
Show Gist options
  • Save ireneisdoomed/acb02331fc4866eece41aaea2fd9f7d3 to your computer and use it in GitHub Desktop.
Save ireneisdoomed/acb02331fc4866eece41aaea2fd9f7d3 to your computer and use it in GitHub Desktop.
PharmGKB - 3128
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import tempfile
import os
spark = SparkSession.builder.appName("PGKB").getOrCreate()
data = spark.read.json("cttv012-2023-10-12_pgkb.json.gz")
drugs = spark.read.parquet("molecule/")
diseases = spark.read.parquet("diseases/").selectExpr(
"id as diseaseId", "name as drugIndicationName"
)
def get_chebi_chembl_lut(drugs):
return (
drugs.select("id", f.explode("crossReferences"))
.filter(f.col("key") == "chEBI")
.withColumn("drugId", f.explode(f.col("value")))
.select(
f.col("id").alias("drugId"),
f.concat(f.lit("CHEBI_"), f.col("drugId")).alias("drugFromSourceId"),
)
.distinct()
)
def get_drug_target_lut(drugs):
return drugs.filter(
(f.col("linkedTargets").isNotNull()) & (f.size("linkedTargets.rows") >= 1)
).select(
f.col("id").alias("drugId"),
f.col("linkedTargets.rows").alias("drugTargetIds"),
)
def get_high_confidence_data(data):
return data.filter(f.col("evidenceLevel").isin(["1A", "1B", "2A", "2B"]))
def flag_is_direct_target(variant_target, drug_targets):
return f.when(f.array_contains(drug_targets, variant_target), True).otherwise(False)
def write_evidence_strings(evidence, output_file):
"""Exports the table to a compressed JSON file containing the evidence strings."""
with tempfile.TemporaryDirectory() as tmp_dir_name:
(
evidence.coalesce(1)
.write.format("json")
.mode("overwrite")
.option("compression", "org.apache.hadoop.io.compress.GzipCodec")
.save(tmp_dir_name)
)
json_chunks = [f for f in os.listdir(tmp_dir_name) if f.endswith(".json.gz")]
assert (
len(json_chunks) == 1
), f"Expected one JSON file, but found {len(json_chunks)}."
os.rename(os.path.join(tmp_dir_name, json_chunks[0]), output_file)
def main(data, drugs, diseases):
xrefs = get_chebi_chembl_lut(drugs)
# drug_indication_lut = get_drug_indication_lut(drugs, diseases)
drug_target_lut = get_drug_target_lut(drugs)
output_file = "cttv012-2023-10-23_pgkb.json.gz"
data_enriched = (
data
# rename column with ChEBIs
.withColumnRenamed("drugId", "drugFromSourceId")
# map ChEBIs to ChEMBLs
.join(xrefs, on="drugFromSourceId", how="left")
.join(drug_target_lut, on="drugId", how="left")
.withColumn(
"isDirectTarget",
flag_is_direct_target(f.col("targetFromSourceId"), f.col("drugTargetIds")),
)
.drop("drugTargetIds")
.distinct()
.persist()
)
hc = get_high_confidence_data(data_enriched)
write_evidence_strings(hc, output_file)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment