Skip to content

Instantly share code, notes, and snippets.

@DSuveges
Last active March 4, 2022 23:24
Show Gist options
  • Save DSuveges/8db02b45f52eac027392ec51d5fd96ae to your computer and use it in GitHub Desktop.
Save DSuveges/8db02b45f52eac027392ec51d5fd96ae to your computer and use it in GitHub Desktop.
This script reads the OT variant list and joins with the Gnomad dataset. This is a precursor of the new variant index genration.
from pyspark.sql.functions import col, array, struct, concat, lit
from pyspark.sql.types import StructField
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
import hail as hl
# Hail session needs to be initialized BEFORE spark initialization:
hl.init()
sparkConf = SparkConf()
sparkConf = sparkConf.set('spark.hadoop.fs.gs.requester.pays.mode', 'AUTO')
sparkConf = sparkConf.set('spark.hadoop.fs.gs.requester.pays.project.id',
'open-targets-eu-dev')
# establish spark connection
spark = (
SparkSession.builder
.config(conf=sparkConf)
.master('local[*]')
.getOrCreate()
)
# Inclusion list parquet:
included_parquet = 'gs://genetics-portal-dev-analysis/dsuveges/all_genetics_variants.2022-03-04'
included_df = (
spark.read.parquet(included_parquet)
.withColumn('chrom', concat(lit('chr'), col('chrom')))
.persist()
)
included_df.show(5)
# +-----+---------+---+---+
# |chrom| pos|ref|alt|
# +-----+---------+---+---+
# | 17| 39919884| T| G|
# | 5|111094790| C| G|
# | 4|151863785| C| T|
# | 1| 22076864| G| A|
# | 9|136353630| A| G|
# +-----+---------+---+---+
# only showing top 5 rows
# Converting data to hail table + setting integer type:
ot_variants = hl.Table.from_spark(included_df)
ot_variants = (
# int64 is not accepted, needs to convert to int32:
ot_variants.annotate(pos = hl.int32(ot_variants.pos))
)
# Adding locus and allels columns:
ot_variants = (
ot_variants
.annotate(
# Creating locus object:
locus = hl.locus(
ot_variants.chrom,
ot_variants.pos,
reference_genome='GRCh38'
),
# Creating array of alleles:
alleles = hl.array([ot_variants.ref, ot_variants.alt])
)
)
ot_variants = (
ot_variants
# Indexing dataset by locus and alleles + dropping all other colums:
.key_by(ot_variants.locus, ot_variants.alleles)
.drop(*['chrom', 'pos', 'alt', 'ref'])
)
ot_variants.show(4)
# +---------------+------------+
# | locus | alleles |
# +---------------+------------+
# | locus<GRCh38> | array<str> |
# +---------------+------------+
# | chr1:13550 | ["G","A"] |
# | chr1:14677 | ["G","A"] |
# | chr1:47159 | ["T","C"] |
# | chr1:54490 | ["G","A"] |
# +---------------+------------+
# Gnomad hail table:
gnomad_file = 'gs://gcp-public-data--gnomad/release/3.1.1/ht/genomes/gnomad.genomes.v3.1.1.sites.ht'
# Load data
gnomad_table = hl.read_table(gnomad_file)
selected_variants = (
gnomad_table
.join(ot_variants, how='right')
)
##
## At this point the data is saved to streamline downstream steps:
##
(
selected_variants
.write('gs://genetics-portal-dev-analysis/dsuveges/gnomad_ot_joined_variants.ht')
)
# Processing and lifting over the selected variants:
selected_variants = hl.read_table('gs://genetics-portal-dev-analysis/dsuveges/gnomad_ot_joined_variants.ht')
# GRCh38 to 37 chainfile:
CHAIN_FILE = 'gs://hail-common/references/grch38_to_grch37.over.chain.gz'
# Parameters:
OUT_PARTITIONS = 256
# Population of interest:
POPULATIONS = {
'afr', # African-American
'amr', # American Admixed/Latino
'ami', # Amish ancestry
'asj', # Ashkenazi Jewish
'eas', # East Asian
'fin', # Finnish
'nfe', # Non-Finnish European
'mid', # Middle Eastern
'sas', # South Asian
'oth' # Other
}
def af_to_maf(af):
"""Converts AF to MAF. The resulting value is always <= 0.5."""
return hl.if_else(af <= 0.5, af, 1 - af)
# Output files:
out_parquet = 'gs://genetics-portal-dev-analysis/dsuveges/variant-annotation.2022-03-04'
# Extracting AF indices of populations:
population_indices = selected_variants.globals.freq_index_dict.collect()[0]
population_indices = {pop: population_indices[f'{pop}-adj'] for pop in POPULATIONS}
# Adding population allele frequency and minor allele frequency:
selected_variants = (
selected_variants
.annotate(
# Generate struct for alt. allele frequency in selected populations:
af = hl.struct(**{pop: selected_variants.freq[index].AF for pop, index in population_indices.items()}),
)
)
# Add chain file
grch37 = hl.get_reference('GRCh37')
grch38 = hl.get_reference('GRCh38')
grch38.add_liftover(CHAIN_FILE, grch37)
# Liftover
selected_variants = (
selected_variants
.annotate(locus_GRCh37 = hl.liftover(selected_variants.locus, 'GRCh37'))
)
# Adding build-specific coordinates to the table:
selected_variants = selected_variants.annotate(
chrom_b38 = selected_variants.locus.contig.replace('chr', ''),
pos_b38 = selected_variants.locus.position,
chrom_b37 = selected_variants.locus_GRCh37.contig.replace('chr', ''),
pos_b37 = selected_variants.locus_GRCh37.position,
ref = selected_variants.alleles[0],
alt = selected_variants.alleles[1],
allele_type = selected_variants.allele_info.allele_type
)
# Updating table:
selected_variants = selected_variants.annotate(
# Updating CADD column:
cadd=selected_variants.cadd.rename({'raw_score': 'raw'}).drop('has_duplicate'),
# Adding locus as new column:
locus_GRCh38=selected_variants.locus
)
# Drop all global annotations:
selected_variants = selected_variants.select_globals()
# Drop unnecessary VEP fields
selected_variants = selected_variants.annotate(
vep=selected_variants.vep.drop(
'assembly_name',
'allele_string',
'ancestral',
'context',
'end',
'id',
'input',
'intergenic_consequences',
'seq_region_name',
'start',
'strand',
'variant_class'
)
)
# Sort columns
col_order = [
'locus_GRCh38', 'chrom_b38', 'pos_b38',
'chrom_b37', 'pos_b37',
'ref', 'alt', 'allele_type', 'vep', 'rsid', 'af', 'cadd'
]
# Repartition and write parquet file
(
selected_variants
.select(*col_order)
.to_spark(flatten=False)
.coalesce(OUT_PARTITIONS)
.write.mode('overwrite').parquet(out_parquet)
)
# Reading variants and process
variant_annotations = (
spark.read.parquet(out_parquet)
.persist()
)
variant_annotations.show(3)
# +-----------------+---------+-----------------+---------+---------+---------+---------+----+---+-----------+--------------------+------------+--------------------+------------------+
# | locus| alleles| locus_GRCh38|chrom_b38| pos_b38|chrom_b37| pos_b37| ref|alt|allele_type| vep| rsid| af| cadd|
# +-----------------+---------+-----------------+---------+---------+---------+---------+----+---+-----------+--------------------+------------+--------------------+------------------+
# |{chr2, 102431697}| [G, A]|{chr2, 102431697}| 2|102431697| 2|103048157| G| A| snv|{intron_variant, ...|[rs56331791]|{0.21676300578034...| {1.366, 0.004911}|
# |{chr2, 102431747}| [C, A]|{chr2, 102431747}| 2|102431747| 2|103048207| C| A| snv|{intron_variant, ...| [rs6755905]|{0.52362584378013...| {1.302, -0.0054}|
# |{chr2, 102431792}|[CTAT, C]|{chr2, 102431792}| 2|102431792| 2|103048252|CTAT| C| del|{intron_variant, ...|[rs34168805]|{0.52213666987487...|{0.324, -0.291829}|
# +-----------------+---------+-----------------+---------+---------+---------+---------+----+---+-----------+--------------------+------------+--------------------+------------------+
# only showing top 3 rows
# Variant count: 5_388_925
variant_annotations.count()
# How many of the variants could not be mapped: 0
(
variant_annotations
.filter(col('vep').isNull())
.count()
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment