Skip to content

Instantly share code, notes, and snippets.

@DSuveges
Last active September 12, 2023 15:23
Show Gist options
  • Save DSuveges/a406a4fbaa672d73f41b952e81a2658a to your computer and use it in GitHub Desktop.
Save DSuveges/a406a4fbaa672d73f41b952e81a2658a to your computer and use it in GitHub Desktop.
A short snippet to generate VEP compatible data based on OT Genetics Locus2Gene scores
"""This is a script to generate VEP input based on OT Genetics Locus2Gene scores."""
"""Example values:
l2g = gs://open-targets-genetics-releases/22.09/l2g
finemap_credset = gs://open-targets-genetics-releases/22.09/v2d_credset/
ldexpanded_credset = gs://open-targets-genetics-releases/22.09/v2d/
targets = gs://open-targets-data-releases/23.06/output/etl/parquet/targets/
L2G=gs://open-targets-genetics-releases/22.09/l2g
FINEMAP=gs://open-targets-genetics-releases/22.09/v2d_credset/
LD=gs://open-targets-genetics-releases/22.09/v2d/
TARGETS=gs://open-targets-data-releases/23.06/output/etl/parquet/targets/
OUTPUT=gs://ot-team/dsuveges/VEP_test/
python cicaful.py \
--l2g ${L2G} \
--targets ${TARGETS} \
--fm_credset ${FINEMAP} \
--ld_credset ${LD} \
--output ${OUTPUT}
"""
import argparse
import logging
from typing import TYPE_CHECKING
from pyspark.sql import Column, DataFrame, SparkSession
from pyspark.sql import functions as f
from pyspark.sql import types as t
from pyspark.sql.window import Window
if TYPE_CHECKING:
from argparse import ArgumentParser
def generate_variant_id(
chromosome: Column,
position: Column,
reference_allele: Column,
alternate_allele: Column,
) -> Column:
"""Based on chr:pos alt and ref alleles variant id is returned."""
return f.concat_ws("_", chromosome, position, reference_allele, alternate_allele)
def parse_command_line_parameters():
"""Parsingg command line arguments.
Returns:
ArgumentParser: _description_
"""
# Parse CLI arguments:
parser = argparse.ArgumentParser(
description="This is a script to generate VEP data based on OpenTargets Genetics locus2gene scores."
)
parser.add_argument("--l2g", help="Locus 2 gene parquet.", type=str, required=True)
parser.add_argument(
"--targets", help="Target index parquet", type=str, required=True
)
parser.add_argument(
"--fm_credset", help="Fine-mapped credible sets", type=str, required=True
)
parser.add_argument(
"--ld_credset", help="LD expanded credible sets.", type=str, required=True
)
parser.add_argument("--output", help="Output tsv file", type=str, required=False)
return parser
def process_finemapped_credset(finemapped_credset: DataFrame) -> DataFrame:
"""Combining dataset to make sure all tags and leads are represented."""
return (
finemapped_credset
# Dropping non-GWAS studies and non significant tag variants:
.filter((f.col("type") == "gwas") & (f.col("is99_credset") == True))
.select(
# Extract study identifier:
f.col("study_id").alias("studyId"),
# Extract lead identifier:
generate_variant_id(
f.col("lead_chrom"),
f.col("lead_pos"),
f.col("lead_ref"),
f.col("lead_alt"),
).alias("leadVariantId"),
# Extract tag variant:
f.col("tag_chrom").alias("chromosome"),
f.col("tag_pos").alias("position"),
f.col("tag_ref").alias("referenceAllele"),
f.col("tag_alt").alias("alternateAllele"),
)
.distinct()
.unionByName(
(
finemapped_credset
# Dropping non-GWAS studies and non significant tag variants:
.filter(
(f.col("type") == "gwas") & (f.col("is99_credset") == True)
).select(
# Extract study identifier:
f.col("study_id").alias("studyId"),
# Extract lead identifier:
generate_variant_id(
f.col("lead_chrom"),
f.col("lead_pos"),
f.col("lead_ref"),
f.col("lead_alt"),
).alias("leadVariantId"),
# Extract tag variant:
f.col("lead_chrom").alias("chromosome"),
f.col("lead_pos").alias("position"),
f.col("lead_ref").alias("referenceAllele"),
f.col("lead_alt").alias("alternateAllele"),
)
)
)
.distinct()
)
def process_ld_expanded_credset(ld_credset: DataFrame) -> DataFrame:
"""Combining dataset to make sure all tags and leads are represented."""
return (
ld_credset
# Dropping non-GWAS studies:
.select(
# Extract study identifier:
f.col("study_id").alias("studyId"),
# Extract lead identifier:
generate_variant_id(
f.col("lead_chrom"),
f.col("lead_pos"),
f.col("lead_ref"),
f.col("lead_alt"),
).alias("leadVariantId"),
# Extract tag variant:
f.col("tag_chrom").alias("chromosome"),
f.col("tag_pos").alias("position"),
f.col("tag_ref").alias("referenceAllele"),
f.col("tag_alt").alias("alternateAllele"),
)
.unionByName(
(
ld_credset.filter(f.col("pics_95perc_credset") == True).select(
# Extract study identifier:
f.col("study_id").alias("studyId"),
# Extract lead identifier:
generate_variant_id(
f.col("lead_chrom"),
f.col("lead_pos"),
f.col("lead_ref"),
f.col("lead_alt"),
).alias("leadVariantId"),
# Extract lead variant:
f.col("lead_chrom").alias("chromosome"),
f.col("lead_pos").alias("position"),
f.col("lead_ref").alias("referenceAllele"),
f.col("lead_alt").alias("alternateAllele"),
)
)
)
.distinct()
)
def generate_vep_evidence(
l2g_data: DataFrame,
finemapped_data: DataFrame,
ld_expanded_data: DataFrame,
targets_data: DataFrame,
) -> DataFrame:
"""Main and importable function to generate VEP evidence."""
# Processing targets:
targets = targets_data.select(
f.col("id").alias("geneId"),
f.col("approvedSymbol").alias("geneSymbol"),
f.col("approvedName").alias("geneName"),
).distinct()
# Processing raw l2g dataset:
lead2gene = (
l2g_data.select(
f.col("study_id").alias("studyId"),
generate_variant_id(
f.col("chrom"), f.col("pos"), f.col("ref"), f.col("alt")
).alias("leadVariantId"),
f.col("gene_id").alias("geneId"),
f.col("y_proba_full_model").alias("l2g"),
)
# Dropping non-significant locus to gene predictions:
.filter(f.col("l2g") >= 0.5).distinct()
# Joining with parsed target data:
.join(targets, on="geneId", how="left")
)
# Processing ld_expanded credible sets:
ld_expanded_credible_sets = process_ld_expanded_credset(ld_expanded_data)
# Processing fine-mapped credible sets:
finemap_credible_set = process_finemapped_credset(finemapped_data)
# Combining credible sets:
credsets = ld_expanded_credible_sets.unionByName(finemap_credible_set).distinct()
# Joining credible set and l2g data together
vep_data = (
# Locus 2 gene dataset (with gene names and symbol)
lead2gene
# Joining with credible sets variants:
.join(credsets, on=["studyId", "leadVariantId"], how="left")
# Dropping any top loci that has no tag (this is empty set)
.filter(f.col("position").isNotNull())
# We are no longer interested in study level data:
.drop("leadVariantId", "studyId")
.withColumn(
"variantId",
generate_variant_id(
f.col("chromosome"),
f.col("position"),
f.col("referenceAllele"),
f.col("alternateAllele"),
),
)
.distinct()
)
return (
vep_data
# Annotating the TOP L2G score for a given variant-gene pair (regardless of the study):
.withColumn(
"maxL2g",
f.max(f.col("l2g")).over(Window.partitionBy("geneId", "variantId")),
)
.filter(f.col("l2g") == f.col("maxL2g"))
# Drop unnecessary columns:
.select(
"chromosome",
"position",
"referenceAllele",
"alternateAllele",
"geneId",
"geneSymbol",
"geneName",
"l2g",
)
.distinct()
.persist()
)
def main(
l2g_parquet: str,
finemapped_credset: str,
ld_expanded_credset: str,
targets: str,
output_file: str,
) -> None:
"""Main. Nothing to see here."""
# Initialize spark:
spark = SparkSession.builder.appName("VEP_evidence").getOrCreate()
# Read all input datasets.
l2g_data = spark.read.parquet(l2g_parquet)
finemapped_data = spark.read.parquet(finemapped_credset)
ld_expanded_data = spark.read.parquet(ld_expanded_credset)
targets_data = spark.read.parquet(targets)
# Call VEP data integrator:
output_data = generate_vep_evidence(
l2g_data, finemapped_data, ld_expanded_data, targets_data
)
# Report some stuff:
logging.info(f"Number of VEP evidence: {output_data.count()}")
logging.info(
f'Number of genes in VEP evidence: {output_data.select("geneId").distinct().count()}'
)
logging.info(
f'Number of variants in VEP evidence: {output_data.select("chromosome","position","referenceAllele","alternateAllele").distinct().count()}'
)
logging.info(f"Saving data to: {output_file}")
(
output_data.repartition(1)
.orderBy(f.col("chromosome"), f.col("position"))
.write.mode("overwrite")
.csv(output_file, sep="\t", header=True)
)
logging.info("Done.")
if __name__ == "__main__":
# If no logfile is specified, logs are written to stderr
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# Parse parameters.
args = parse_command_line_parameters().parse_args()
# Call main
main(args.l2g, args.fm_credset, args.ld_credset, args.targets, args.output)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment