Last active
March 4, 2022 23:24
-
-
Save DSuveges/8db02b45f52eac027392ec51d5fd96ae to your computer and use it in GitHub Desktop.
This script reads the OT variant list and joins with the Gnomad dataset. This is a precursor of the new variant index genration.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql.functions import col, array, struct, concat, lit | |
from pyspark.sql.types import StructField | |
from pyspark import SparkConf | |
from pyspark.sql import SparkSession | |
from pyspark.context import SparkContext | |
import hail as hl | |
# Hail session needs to be initialized BEFORE spark initialization: | |
hl.init() | |
sparkConf = SparkConf() | |
sparkConf = sparkConf.set('spark.hadoop.fs.gs.requester.pays.mode', 'AUTO') | |
sparkConf = sparkConf.set('spark.hadoop.fs.gs.requester.pays.project.id', | |
'open-targets-eu-dev') | |
# establish spark connection | |
spark = ( | |
SparkSession.builder | |
.config(conf=sparkConf) | |
.master('local[*]') | |
.getOrCreate() | |
) | |
# Inclusion list parquet: | |
included_parquet = 'gs://genetics-portal-dev-analysis/dsuveges/all_genetics_variants.2022-03-04' | |
included_df = ( | |
spark.read.parquet(included_parquet) | |
.withColumn('chrom', concat(lit('chr'), col('chrom'))) | |
.persist() | |
) | |
included_df.show(5) | |
# +-----+---------+---+---+ | |
# |chrom| pos|ref|alt| | |
# +-----+---------+---+---+ | |
# | 17| 39919884| T| G| | |
# | 5|111094790| C| G| | |
# | 4|151863785| C| T| | |
# | 1| 22076864| G| A| | |
# | 9|136353630| A| G| | |
# +-----+---------+---+---+ | |
# only showing top 5 rows | |
# Converting data to hail table + setting integer type: | |
ot_variants = hl.Table.from_spark(included_df) | |
ot_variants = ( | |
# int64 is not accepted, needs to convert to int32: | |
ot_variants.annotate(pos = hl.int32(ot_variants.pos)) | |
) | |
# Adding locus and allels columns: | |
ot_variants = ( | |
ot_variants | |
.annotate( | |
# Creating locus object: | |
locus = hl.locus( | |
ot_variants.chrom, | |
ot_variants.pos, | |
reference_genome='GRCh38' | |
), | |
# Creating array of alleles: | |
alleles = hl.array([ot_variants.ref, ot_variants.alt]) | |
) | |
) | |
ot_variants = ( | |
ot_variants | |
# Indexing dataset by locus and alleles + dropping all other colums: | |
.key_by(ot_variants.locus, ot_variants.alleles) | |
.drop(*['chrom', 'pos', 'alt', 'ref']) | |
) | |
ot_variants.show(4) | |
# +---------------+------------+ | |
# | locus | alleles | | |
# +---------------+------------+ | |
# | locus<GRCh38> | array<str> | | |
# +---------------+------------+ | |
# | chr1:13550 | ["G","A"] | | |
# | chr1:14677 | ["G","A"] | | |
# | chr1:47159 | ["T","C"] | | |
# | chr1:54490 | ["G","A"] | | |
# +---------------+------------+ | |
# Gnomad hail table: | |
gnomad_file = 'gs://gcp-public-data--gnomad/release/3.1.1/ht/genomes/gnomad.genomes.v3.1.1.sites.ht' | |
# Load data | |
gnomad_table = hl.read_table(gnomad_file) | |
selected_variants = ( | |
gnomad_table | |
.join(ot_variants, how='right') | |
) | |
## | |
## At this point the data is saved to streamline downstream steps: | |
## | |
( | |
selected_variants | |
.write('gs://genetics-portal-dev-analysis/dsuveges/gnomad_ot_joined_variants.ht') | |
) | |
# Processing and lifting over the selected variants: | |
selected_variants = hl.read_table('gs://genetics-portal-dev-analysis/dsuveges/gnomad_ot_joined_variants.ht') | |
# GRCh38 to 37 chainfile: | |
CHAIN_FILE = 'gs://hail-common/references/grch38_to_grch37.over.chain.gz' | |
# Parameters: | |
OUT_PARTITIONS = 256 | |
# Population of interest: | |
POPULATIONS = { | |
'afr', # African-American | |
'amr', # American Admixed/Latino | |
'ami', # Amish ancestry | |
'asj', # Ashkenazi Jewish | |
'eas', # East Asian | |
'fin', # Finnish | |
'nfe', # Non-Finnish European | |
'mid', # Middle Eastern | |
'sas', # South Asian | |
'oth' # Other | |
} | |
def af_to_maf(af): | |
"""Converts AF to MAF. The resulting value is always <= 0.5.""" | |
return hl.if_else(af <= 0.5, af, 1 - af) | |
# Output files: | |
out_parquet = 'gs://genetics-portal-dev-analysis/dsuveges/variant-annotation.2022-03-04' | |
# Extracting AF indices of populations: | |
population_indices = selected_variants.globals.freq_index_dict.collect()[0] | |
population_indices = {pop: population_indices[f'{pop}-adj'] for pop in POPULATIONS} | |
# Adding population allele frequency and minor allele frequency: | |
selected_variants = ( | |
selected_variants | |
.annotate( | |
# Generate struct for alt. allele frequency in selected populations: | |
af = hl.struct(**{pop: selected_variants.freq[index].AF for pop, index in population_indices.items()}), | |
) | |
) | |
# Add chain file | |
grch37 = hl.get_reference('GRCh37') | |
grch38 = hl.get_reference('GRCh38') | |
grch38.add_liftover(CHAIN_FILE, grch37) | |
# Liftover | |
selected_variants = ( | |
selected_variants | |
.annotate(locus_GRCh37 = hl.liftover(selected_variants.locus, 'GRCh37')) | |
) | |
# Adding build-specific coordinates to the table: | |
selected_variants = selected_variants.annotate( | |
chrom_b38 = selected_variants.locus.contig.replace('chr', ''), | |
pos_b38 = selected_variants.locus.position, | |
chrom_b37 = selected_variants.locus_GRCh37.contig.replace('chr', ''), | |
pos_b37 = selected_variants.locus_GRCh37.position, | |
ref = selected_variants.alleles[0], | |
alt = selected_variants.alleles[1], | |
allele_type = selected_variants.allele_info.allele_type | |
) | |
# Updating table: | |
selected_variants = selected_variants.annotate( | |
# Updating CADD column: | |
cadd=selected_variants.cadd.rename({'raw_score': 'raw'}).drop('has_duplicate'), | |
# Adding locus as new column: | |
locus_GRCh38=selected_variants.locus | |
) | |
# Drop all global annotations: | |
selected_variants = selected_variants.select_globals() | |
# Drop unnecessary VEP fields | |
selected_variants = selected_variants.annotate( | |
vep=selected_variants.vep.drop( | |
'assembly_name', | |
'allele_string', | |
'ancestral', | |
'context', | |
'end', | |
'id', | |
'input', | |
'intergenic_consequences', | |
'seq_region_name', | |
'start', | |
'strand', | |
'variant_class' | |
) | |
) | |
# Sort columns | |
col_order = [ | |
'locus_GRCh38', 'chrom_b38', 'pos_b38', | |
'chrom_b37', 'pos_b37', | |
'ref', 'alt', 'allele_type', 'vep', 'rsid', 'af', 'cadd' | |
] | |
# Repartition and write parquet file | |
( | |
selected_variants | |
.select(*col_order) | |
.to_spark(flatten=False) | |
.coalesce(OUT_PARTITIONS) | |
.write.mode('overwrite').parquet(out_parquet) | |
) | |
# Reading variants and process | |
variant_annotations = ( | |
spark.read.parquet(out_parquet) | |
.persist() | |
) | |
variant_annotations.show(3) | |
# +-----------------+---------+-----------------+---------+---------+---------+---------+----+---+-----------+--------------------+------------+--------------------+------------------+ | |
# | locus| alleles| locus_GRCh38|chrom_b38| pos_b38|chrom_b37| pos_b37| ref|alt|allele_type| vep| rsid| af| cadd| | |
# +-----------------+---------+-----------------+---------+---------+---------+---------+----+---+-----------+--------------------+------------+--------------------+------------------+ | |
# |{chr2, 102431697}| [G, A]|{chr2, 102431697}| 2|102431697| 2|103048157| G| A| snv|{intron_variant, ...|[rs56331791]|{0.21676300578034...| {1.366, 0.004911}| | |
# |{chr2, 102431747}| [C, A]|{chr2, 102431747}| 2|102431747| 2|103048207| C| A| snv|{intron_variant, ...| [rs6755905]|{0.52362584378013...| {1.302, -0.0054}| | |
# |{chr2, 102431792}|[CTAT, C]|{chr2, 102431792}| 2|102431792| 2|103048252|CTAT| C| del|{intron_variant, ...|[rs34168805]|{0.52213666987487...|{0.324, -0.291829}| | |
# +-----------------+---------+-----------------+---------+---------+---------+---------+----+---+-----------+--------------------+------------+--------------------+------------------+ | |
# only showing top 3 rows | |
# Variant count: 5_388_925 | |
variant_annotations.count() | |
# How many of the variants could not be mapped: 0 | |
( | |
variant_annotations | |
.filter(col('vep').isNull()) | |
.count() | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment