Skip to content

Instantly share code, notes, and snippets.

@ireneisdoomed
Last active January 31, 2023 10:38
Show Gist options
  • Save ireneisdoomed/5a452eda3cd7e58b1d0c987f5239fdbe to your computer and use it in GitHub Desktop.
Save ireneisdoomed/5a452eda3cd7e58b1d0c987f5239fdbe to your computer and use it in GitHub Desktop.
Effect sizes from colocated QTLs are derived to inform the mechanism of action of a Genetics Portal association
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as f
spark = SparkSession.builder.getOrCreate()
# 1. Read data
coloc = (
spark.read.parquet("gs://genetics-portal-dev-data/22.09.0/outputs/v2d_coloc")
.filter(f.col("right_type") != "gwas")
.select(
f.concat_ws("_", "left_chrom", "left_pos", "left_ref", "left_alt").alias(
"left_locus_id"
),
f.concat_ws("_", "right_chrom", "right_pos", "right_ref", "right_alt").alias(
"right_locus_id"
),
f.col("left_study").alias("left_study_id"),
f.col("right_study").alias("right_study_id"),
"right_gene_id",
"coloc_h4",
"left_var_right_study_beta",
)
# Get highest coloc result - aggregation to not take into account the tissue
.withColumn(
"rank",
f.row_number().over(
Window.partitionBy(
"left_locus_id", "left_study_id", "right_gene_id"
).orderBy(f.abs(f.col("left_var_right_study_beta")).desc())
),
)
.filter(f.col("rank") == 1)
.withColumnRenamed("coloc_h4", "max_coloc_h4")
.persist()
)
evd = (
spark.read.parquet(
"gs://open-targets-data-releases/22.11/output/etl/parquet/evidence/sourceId=ot_genetics_portal"
)
.selectExpr(
"studyId",
"variantId as locusId",
"targetId",
"diseaseId",
"oddsRatio",
"beta",
"score",
)
.withColumn(
"effect",
f.when((f.col("beta") > 0) | (f.col("oddsRatio") > 1), "risk")
.when((f.col("beta") < 0) | (f.col("oddsRatio") < 1), "protective")
.otherwise("unknown"), # 23% of evidence has unknown direction of effect
)
.persist()
)
chembl = (
spark.read.parquet("gs://ot-team/irene/chemblEvidences")
.withColumn("maximumClinicalTrialPhase", f.explode("maximumClinicalTrialPhase"))
.withColumnRenamed("homogenized", "drugMechanism")
.groupBy("diseaseId", "targetId", "drugMechanism")
.agg({"maximumClinicalTrialPhase": "max"})
.withColumn("effect", f.lit("protective"))
.filter(f.col("drugMechanism") != "noEvaluable")
.persist()
)
# 2. By looking at the effect size of the left locus in the right study, what do the directions of effect say?
sqtl_other_qtl_correlation = (
coloc.withColumn(
"neg_sqtl",
f.when(
(
(f.col("left_var_right_study_beta") < 0)
& (f.col("right_study_id") == "GTEx-sQTL")
),
1,
),
)
.withColumn(
"pos_sqtl",
f.when(
(
(f.col("left_var_right_study_beta") > 0)
& (f.col("right_study_id") == "GTEx-sQTL")
),
1,
),
)
.withColumn(
"neg_oqtl",
f.when(
(
(f.col("left_var_right_study_beta") < 0)
& (f.col("right_study_id") != "GTEx-sQTL")
),
1,
),
)
.withColumn(
"pos_oqtl",
f.when(
(
(f.col("left_var_right_study_beta") > 0)
& (f.col("right_study_id") != "GTEx-sQTL")
),
1,
),
)
.groupBy("left_locus_id", "left_study_id", "right_gene_id")
.agg(
f.sum(f.col("neg_sqtl")).alias("n_neg_sqtl"),
f.sum(f.col("pos_sqtl")).alias("n_pos_sqtl"),
f.sum(f.col("neg_oqtl")).alias("n_neg_oqtl"),
f.sum(f.col("pos_oqtl")).alias("n_pos_oqtl"),
)
.persist()
)
sqtl_oqtl_direction_discordance = sqtl_other_qtl_correlation.filter(
(
(f.col("n_neg_sqtl") > 0)
& (
f.col("n_neg_sqtl") & (f.col("n_pos_oqtl") > 0)
| (f.col("n_pos_sqtl") > 0) & (f.col("n_neg_oqtl") > 0)
)
)
) # 14% are discordant - not perfect because some source of discordance is coming from the QTL itself (tissue is aggregated)
########## FIRST APPROACH (largest beta) ##########
coloc_dof_top = (
coloc.withColumn(
"rank",
f.row_number().over(
Window.partitionBy(
"left_locus_id", "left_study_id", "right_gene_id"
).orderBy(f.abs(f.col("left_var_right_study_beta")).desc())
),
)
.filter(f.col("rank") == 1)
.drop("rank")
.withColumn(
# the overall direction of effect is the row with the largest effect size
"colocMechanism",
f.when(f.col("left_var_right_study_beta") > 0, "gof")
.when(f.col("left_var_right_study_beta") < 0, "lof")
.otherwise("unknown"),
)
.persist()
)
######### SECOND APPROACH (Democratically count each direction of effect) ##########
coloc_dof_count = (
coloc.withColumn("neg", f.when(f.col("left_var_right_study_beta") < 0, 1))
.withColumn("pos", f.when(f.col("left_var_right_study_beta") > 0, 1))
.groupBy("left_locus_id", "left_study_id", "right_gene_id")
.agg(
f.sum(f.col("neg")).alias("n_protect_coloc"),
f.sum(f.col("pos")).alias("n_risk_coloc"),
)
# fill with 0 when there are no evidence supporting one direction
.withColumn("n_protect_coloc", f.coalesce(f.col("n_protect_coloc"), f.lit(0)))
.withColumn("n_risk_coloc", f.coalesce(f.col("n_risk_coloc"), f.lit(0)))
)
# Let's analyse them together
coloc_dof = coloc_dof_top.join(
coloc_dof_count, on=["left_locus_id", "left_study_id", "right_gene_id"]
).persist()
(
# compare top/count methods
coloc_dof.withColumn(
"colocMechanismCount",
f.when(f.col("n_risk_coloc") > f.col("n_protect_coloc"), "lof")
.when(f.col("n_risk_coloc") < f.col("n_protect_coloc"), "gof")
.when(f.col("n_risk_coloc") == f.col("n_protect_coloc"), "equal")
.otherwise("unknown"),
)
.filter(f.col("colocMechanismCount") != "equal")
.withColumn(
"concordance",
f.when(f.col("colocMechanism") == f.col("colocMechanismCount"), True).otherwise(
False
),
)
.groupBy("concordance")
.count()
.show()
)
## See how evd direction corresponds to coloc-derived direction
evd_coloc = evd.join(
coloc_dof.selectExpr(
"left_study_id as studyId",
"left_locus_id as locusId",
"right_gene_id as targetId",
"left_var_right_study_beta",
"colocMechanism",
),
on=["studyId", "locusId", "targetId"],
how="left",
).persist()
### 28% of evidence has a coloc-derived direction of effect
### Among these, ~50% are GoF/LoF
evd_chembl = evd_coloc.filter(
(f.col("colocMechanism").isNotNull()) & (f.col("colocMechanism") != "unknown")
).join(chembl, on=["diseaseId", "targetId", "effect"])
effect_comparison = (
evd_chembl.groupBy(
"colocMechanism", "drugMechanism", "max(maximumClinicalTrialPhase)"
)
.count()
.orderBy(f.col("colocMechanism"), f.col("max(maximumClinicalTrialPhase)"))
)
# Plot comparison of colocMechanism and drugMechanism by maximumClinicalTrialPhase
df = effect_comparison.toPandas()
df_pivot = df.pivot_table(index=['drugMechanism', 'maximumClinicalTrialPhase'], columns='colocMechanism', values='count', aggfunc='sum')
df_pivot.plot.bar(stacked=True, figsize=(10,6))
plt.title('Comparison of colocMechanism and drugMechanism by maximumClinicalTrialPhase')
plt.xlabel('Maximum clinical trial phase and drugMechanism')
plt.xticks(rotation=60)
plt.ylabel('Count')
plt.legend(['GoF (from coloc)', 'LoF (from coloc)'])
plt.show()
######### (OBSOLETE) SECOND APPROACH (Democratically count each direction of effect) ##########
# TODO: build this approach into the previous one to compare the two
coloc_dof_count = (
coloc.withColumn("neg", f.when(f.col("left_var_right_study_beta") < 0, 1))
.withColumn("pos", f.when(f.col("left_var_right_study_beta") > 0, 1))
.groupBy("left_locus_id", "left_study_id", "right_gene_id")
.agg(
f.sum(f.col("neg")).alias("n_protect_coloc"),
f.sum(f.col("pos")).alias("n_risk_coloc"),
)
)
evd_coloc_count = evd.join(
coloc_dof_top.selectExpr(
"left_study_id as studyId",
"left_locus_id as locusId",
"right_gene_id as targetId",
"n_protect_coloc",
"n_risk_coloc",
),
on=["studyId", "locusId", "targetId"],
how="left",
).persist()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment