Skip to content

Instantly share code, notes, and snippets.

@d0choa
Created June 29, 2023 21:14
Show Gist options
  • Save d0choa/f9e043ae40be60ebac15172f8e4a932d to your computer and use it in GitHub Desktop.
Save d0choa/f9e043ae40be60ebac15172f8e4a932d to your computer and use it in GitHub Desktop.
Experiment to implement distance based clumps
"""Prototype of distance based clumping."""
import pyspark.sql.functions as f
from pyspark.sql import Column, SparkSession, Window
spark = SparkSession.builder.getOrCreate()
data = [
("s1", "chr1", 3, 2.0, False),
("s1", "chr1", 4, 3.0, False),
("s1", "chr1", 5, 4.0, True),
("s1", "chr1", 6, 2.0, False),
("s1", "chr1", 7, 3.0, False),
("s1", "chr1", 8, 4.0, False),
("s1", "chr1", 9, 4.5, False),
("s1", "chr1", 10, 6.0, True),
("s1", "chr1", 11, 5.0, False),
("s1", "chr1", 12, 3.0, False),
("s1", "chr1", 14, 2.0, True),
("s1", "chr1", 16, 2.5, False),
("s1", "chr1", 18, 3.0, True),
("s1", "chr1", 20, 1.5, False),
]
df = spark.createDataFrame(
data, ["studyId", "chromosome", "position", "negLogPValue", "isSemiIndex"]
).persist()
window_length = 3
def window_based_clump_rank(
chromosome: Column,
position_col: Column,
neglogpvalue_col: Column,
window_length: int,
) -> Column:
"""Distance based clumping.
SNPs are clumped if they are within a range distance of a more significant SNP.
Args:
chromosome: Chromosome column
position_col: Position column
neglogpvalue_col: P-value column
window_length: Window length
Returns:
Column containing clump rank
"""
return neglogpvalue_col == f.reverse(
f.array_sort(
f.collect_list(neglogpvalue_col).over(
Window.partitionBy(chromosome)
.orderBy(position_col)
.rangeBetween(-window_length, window_length)
)
)
).getItem(0)
df.withColumn(
"test",
window_based_clump_rank(
f.col("chromosome"), f.col("position"), f.col("negLogPValue"), window_length
),
).show(100, False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment