Skip to content

Instantly share code, notes, and snippets.

@d0choa
Last active March 3, 2022 15:17
Show Gist options
  • Save d0choa/b1162cf494bebcb033349888e2533fec to your computer and use it in GitHub Desktop.
Save d0choa/b1162cf494bebcb033349888e2533fec to your computer and use it in GitHub Desktop.
Diagnostic script to find and explain missing top loci from the V2D dataset
import pyspark.sql.functions as F
from pyspark import SparkConf
from pyspark.sql import SparkSession
sparkConf = SparkConf()
sparkConf = sparkConf.set('spark.hadoop.fs.gs.requester.pays.mode', 'AUTO')
sparkConf = sparkConf.set('spark.hadoop.fs.gs.requester.pays.project.id',
'open-targets-eu-dev')
# establish spark connection
spark = (
SparkSession.builder
.config(conf=sparkConf)
.master('local[*]')
.getOrCreate()
)
# Paths
toplociPath = "gs://genetics-portal-dev-staging/v2d/220210/toploci_betas_fixed.parquet"
v2dPath = "gs://genetics-portal-dev-data/22.02.4/outputs/v2d/"
variantPath = "gs://genetics-portal-dev-data/22.02.4/outputs/lut/variant-index"
fmPath = "gs://genetics-portal-dev-staging/v2d/220210/finemapping.parquet"
ldPath = "gs://genetics-portal-dev-staging/v2d/220210/ld.parquet"
studyPath = "gs://genetics-portal-dev-staging/v2d/220210/studies.parquet"
toploci = spark.read.parquet(toplociPath)
v2d = spark.read.json(v2dPath)
variant = spark.read.json(variantPath)
fmLoci = spark.read.parquet(fmPath)
ldLoci = spark.read.parquet(ldPath)
study = spark.read.parquet(studyPath)
# Join finemapping and ld-expansion
ldFmJoinCols = [
"study_id",
"lead_chrom",
"lead_pos",
"lead_ref",
"lead_alt",
"tag_chrom",
"tag_pos",
"tag_ref",
"tag_alt"
]
ldAndFm = (
ldLoci
.join(
fmLoci,
on = ldFmJoinCols,
how = "full_outer"
)
)
out = (
toploci
.withColumn("toplociId", F.monotonically_increasing_id())
.withColumnRenamed("chrom", "lead_chrom")
.withColumnRenamed("pos", "lead_pos")
.withColumnRenamed("ref", "lead_ref")
.withColumnRenamed("alt", "lead_alt")
# toploci NOT in v2d dataset
.join(
v2d,
on = ["study_id", "lead_chrom", "lead_pos", "lead_ref", "lead_alt"],
how = "left_anti"
)
.join(
ldAndFm
.withColumn("isInLdOrFm", F.lit(True)),
on = ["study_id", "lead_chrom", "lead_pos", "lead_ref", "lead_alt"],
how = "left"
)
.fillna(False, ["isInLdOrFm"])
# add column with toploci NOT in the variant index
.join(
variant
.select(
F.col("chr_id").alias("lead_chrom"),
F.col("position").alias("lead_pos"),
F.col("ref_allele").alias("lead_ref"),
F.col("alt_allele").alias("lead_alt")
)
.withColumn("isLeadInVariantIndex", F.lit(True)),
how = "left",
on = ["lead_chrom", "lead_pos", "lead_ref", "lead_alt"]
)
.fillna(False, ["isLeadInVariantIndex"])
.join(
variant
.select(
F.col("chr_id").alias("tag_chrom"),
F.col("position").alias("tag_pos"),
F.col("ref_allele").alias("tag_ref"),
F.col("alt_allele").alias("tag_alt")
)
.withColumn("isTagInVariantIndex", F.lit(True)),
how = "left",
on = ["tag_chrom", "tag_pos", "tag_ref", "tag_alt"]
)
.fillna(False, ["isTagInVariantIndex"])
.join(
study
.select("study_id", "has_sumstats")
.withColumn("isInStudyTable", F.lit(True)),
on = "study_id",
how = "left"
)
.fillna(False, ["isInStudyTable"])
.withColumn("isInFINNGEN_CHD_STUDY",
F.when(
F.col("study_id") == "FINNGEN_R5_I9_HEARTFAIL_AND_CHD",
True
).otherwise(
False
)
)
.persist()
)
# out.groupBy(F.col("isLeadInVariantIndex")).count().show()
# out.groupBy(F.col("isInLdOrFm")).count().show()
# out.groupBy(F.col("isInStudyTable")).count().show()
# out.filter(F.col("isLeadInVariantIndex") & F.col("isTagInVariantIndex") & F.col("isInLdOrFm") & F.col("isInStudyTable")).count()
# out.filter(F.col("study_id") == "GCST90018757").show(vertical = True)
stats = (
out
.groupBy(
"isLeadInVariantIndex",
"isInLdOrFm",
"isTagInVariantIndex",
"isInStudyTable",
"isInFINNGEN_CHD_STUDY",
"has_sumstats"
).agg(
F.countDistinct("toplociId")
)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment