Last active
September 12, 2023 15:23
-
-
Save DSuveges/a406a4fbaa672d73f41b952e81a2658a to your computer and use it in GitHub Desktop.
A short snippet to generate VEP compatible data based on OT Genetics Locus2Gene scores
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""This is a script to generate VEP input based on OT Genetics Locus2Gene scores.""" | |
"""Example values: | |
l2g = gs://open-targets-genetics-releases/22.09/l2g | |
finemap_credset = gs://open-targets-genetics-releases/22.09/v2d_credset/ | |
ldexpanded_credset = gs://open-targets-genetics-releases/22.09/v2d/ | |
targets = gs://open-targets-data-releases/23.06/output/etl/parquet/targets/ | |
L2G=gs://open-targets-genetics-releases/22.09/l2g | |
FINEMAP=gs://open-targets-genetics-releases/22.09/v2d_credset/ | |
LD=gs://open-targets-genetics-releases/22.09/v2d/ | |
TARGETS=gs://open-targets-data-releases/23.06/output/etl/parquet/targets/ | |
OUTPUT=gs://ot-team/dsuveges/VEP_test/ | |
python cicaful.py \ | |
--l2g ${L2G} \ | |
--targets ${TARGETS} \ | |
--fm_credset ${FINEMAP} \ | |
--ld_credset ${LD} \ | |
--output ${OUTPUT} | |
""" | |
import argparse | |
import logging | |
from typing import TYPE_CHECKING | |
from pyspark.sql import Column, DataFrame, SparkSession | |
from pyspark.sql import functions as f | |
from pyspark.sql import types as t | |
from pyspark.sql.window import Window | |
if TYPE_CHECKING: | |
from argparse import ArgumentParser | |
def generate_variant_id( | |
chromosome: Column, | |
position: Column, | |
reference_allele: Column, | |
alternate_allele: Column, | |
) -> Column: | |
"""Based on chr:pos alt and ref alleles variant id is returned.""" | |
return f.concat_ws("_", chromosome, position, reference_allele, alternate_allele) | |
def parse_command_line_parameters(): | |
"""Parsingg command line arguments. | |
Returns: | |
ArgumentParser: _description_ | |
""" | |
# Parse CLI arguments: | |
parser = argparse.ArgumentParser( | |
description="This is a script to generate VEP data based on OpenTargets Genetics locus2gene scores." | |
) | |
parser.add_argument("--l2g", help="Locus 2 gene parquet.", type=str, required=True) | |
parser.add_argument( | |
"--targets", help="Target index parquet", type=str, required=True | |
) | |
parser.add_argument( | |
"--fm_credset", help="Fine-mapped credible sets", type=str, required=True | |
) | |
parser.add_argument( | |
"--ld_credset", help="LD expanded credible sets.", type=str, required=True | |
) | |
parser.add_argument("--output", help="Output tsv file", type=str, required=False) | |
return parser | |
def process_finemapped_credset(finemapped_credset: DataFrame) -> DataFrame: | |
"""Combining dataset to make sure all tags and leads are represented.""" | |
return ( | |
finemapped_credset | |
# Dropping non-GWAS studies and non significant tag variants: | |
.filter((f.col("type") == "gwas") & (f.col("is99_credset") == True)) | |
.select( | |
# Extract study identifier: | |
f.col("study_id").alias("studyId"), | |
# Extract lead identifier: | |
generate_variant_id( | |
f.col("lead_chrom"), | |
f.col("lead_pos"), | |
f.col("lead_ref"), | |
f.col("lead_alt"), | |
).alias("leadVariantId"), | |
# Extract tag variant: | |
f.col("tag_chrom").alias("chromosome"), | |
f.col("tag_pos").alias("position"), | |
f.col("tag_ref").alias("referenceAllele"), | |
f.col("tag_alt").alias("alternateAllele"), | |
) | |
.distinct() | |
.unionByName( | |
( | |
finemapped_credset | |
# Dropping non-GWAS studies and non significant tag variants: | |
.filter( | |
(f.col("type") == "gwas") & (f.col("is99_credset") == True) | |
).select( | |
# Extract study identifier: | |
f.col("study_id").alias("studyId"), | |
# Extract lead identifier: | |
generate_variant_id( | |
f.col("lead_chrom"), | |
f.col("lead_pos"), | |
f.col("lead_ref"), | |
f.col("lead_alt"), | |
).alias("leadVariantId"), | |
# Extract tag variant: | |
f.col("lead_chrom").alias("chromosome"), | |
f.col("lead_pos").alias("position"), | |
f.col("lead_ref").alias("referenceAllele"), | |
f.col("lead_alt").alias("alternateAllele"), | |
) | |
) | |
) | |
.distinct() | |
) | |
def process_ld_expanded_credset(ld_credset: DataFrame) -> DataFrame: | |
"""Combining dataset to make sure all tags and leads are represented.""" | |
return ( | |
ld_credset | |
# Dropping non-GWAS studies: | |
.select( | |
# Extract study identifier: | |
f.col("study_id").alias("studyId"), | |
# Extract lead identifier: | |
generate_variant_id( | |
f.col("lead_chrom"), | |
f.col("lead_pos"), | |
f.col("lead_ref"), | |
f.col("lead_alt"), | |
).alias("leadVariantId"), | |
# Extract tag variant: | |
f.col("tag_chrom").alias("chromosome"), | |
f.col("tag_pos").alias("position"), | |
f.col("tag_ref").alias("referenceAllele"), | |
f.col("tag_alt").alias("alternateAllele"), | |
) | |
.unionByName( | |
( | |
ld_credset.filter(f.col("pics_95perc_credset") == True).select( | |
# Extract study identifier: | |
f.col("study_id").alias("studyId"), | |
# Extract lead identifier: | |
generate_variant_id( | |
f.col("lead_chrom"), | |
f.col("lead_pos"), | |
f.col("lead_ref"), | |
f.col("lead_alt"), | |
).alias("leadVariantId"), | |
# Extract lead variant: | |
f.col("lead_chrom").alias("chromosome"), | |
f.col("lead_pos").alias("position"), | |
f.col("lead_ref").alias("referenceAllele"), | |
f.col("lead_alt").alias("alternateAllele"), | |
) | |
) | |
) | |
.distinct() | |
) | |
def generate_vep_evidence( | |
l2g_data: DataFrame, | |
finemapped_data: DataFrame, | |
ld_expanded_data: DataFrame, | |
targets_data: DataFrame, | |
) -> DataFrame: | |
"""Main and importable function to generate VEP evidence.""" | |
# Processing targets: | |
targets = targets_data.select( | |
f.col("id").alias("geneId"), | |
f.col("approvedSymbol").alias("geneSymbol"), | |
f.col("approvedName").alias("geneName"), | |
).distinct() | |
# Processing raw l2g dataset: | |
lead2gene = ( | |
l2g_data.select( | |
f.col("study_id").alias("studyId"), | |
generate_variant_id( | |
f.col("chrom"), f.col("pos"), f.col("ref"), f.col("alt") | |
).alias("leadVariantId"), | |
f.col("gene_id").alias("geneId"), | |
f.col("y_proba_full_model").alias("l2g"), | |
) | |
# Dropping non-significant locus to gene predictions: | |
.filter(f.col("l2g") >= 0.5).distinct() | |
# Joining with parsed target data: | |
.join(targets, on="geneId", how="left") | |
) | |
# Processing ld_expanded credible sets: | |
ld_expanded_credible_sets = process_ld_expanded_credset(ld_expanded_data) | |
# Processing fine-mapped credible sets: | |
finemap_credible_set = process_finemapped_credset(finemapped_data) | |
# Combining credible sets: | |
credsets = ld_expanded_credible_sets.unionByName(finemap_credible_set).distinct() | |
# Joining credible set and l2g data together | |
vep_data = ( | |
# Locus 2 gene dataset (with gene names and symbol) | |
lead2gene | |
# Joining with credible sets variants: | |
.join(credsets, on=["studyId", "leadVariantId"], how="left") | |
# Dropping any top loci that has no tag (this is empty set) | |
.filter(f.col("position").isNotNull()) | |
# We are no longer interested in study level data: | |
.drop("leadVariantId", "studyId") | |
.withColumn( | |
"variantId", | |
generate_variant_id( | |
f.col("chromosome"), | |
f.col("position"), | |
f.col("referenceAllele"), | |
f.col("alternateAllele"), | |
), | |
) | |
.distinct() | |
) | |
return ( | |
vep_data | |
# Annotating the TOP L2G score for a given variant-gene pair (regardless of the study): | |
.withColumn( | |
"maxL2g", | |
f.max(f.col("l2g")).over(Window.partitionBy("geneId", "variantId")), | |
) | |
.filter(f.col("l2g") == f.col("maxL2g")) | |
# Drop unnecessary columns: | |
.select( | |
"chromosome", | |
"position", | |
"referenceAllele", | |
"alternateAllele", | |
"geneId", | |
"geneSymbol", | |
"geneName", | |
"l2g", | |
) | |
.distinct() | |
.persist() | |
) | |
def main( | |
l2g_parquet: str, | |
finemapped_credset: str, | |
ld_expanded_credset: str, | |
targets: str, | |
output_file: str, | |
) -> None: | |
"""Main. Nothing to see here.""" | |
# Initialize spark: | |
spark = SparkSession.builder.appName("VEP_evidence").getOrCreate() | |
# Read all input datasets. | |
l2g_data = spark.read.parquet(l2g_parquet) | |
finemapped_data = spark.read.parquet(finemapped_credset) | |
ld_expanded_data = spark.read.parquet(ld_expanded_credset) | |
targets_data = spark.read.parquet(targets) | |
# Call VEP data integrator: | |
output_data = generate_vep_evidence( | |
l2g_data, finemapped_data, ld_expanded_data, targets_data | |
) | |
# Report some stuff: | |
logging.info(f"Number of VEP evidence: {output_data.count()}") | |
logging.info( | |
f'Number of genes in VEP evidence: {output_data.select("geneId").distinct().count()}' | |
) | |
logging.info( | |
f'Number of variants in VEP evidence: {output_data.select("chromosome","position","referenceAllele","alternateAllele").distinct().count()}' | |
) | |
logging.info(f"Saving data to: {output_file}") | |
( | |
output_data.repartition(1) | |
.orderBy(f.col("chromosome"), f.col("position")) | |
.write.mode("overwrite") | |
.csv(output_file, sep="\t", header=True) | |
) | |
logging.info("Done.") | |
if __name__ == "__main__": | |
# If no logfile is specified, logs are written to stderr | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s %(levelname)s %(module)s - %(funcName)s: %(message)s", | |
datefmt="%Y-%m-%d %H:%M:%S", | |
) | |
# Parse parameters. | |
args = parse_command_line_parameters().parse_args() | |
# Call main | |
main(args.l2g, args.fm_credset, args.ld_credset, args.targets, args.output) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment