Last active
December 8, 2022 21:04
-
-
Save ireneisdoomed/d2ed844d557f5ee2f4552df6637e4590 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
v2d_credset = spark.read.parquet( | |
"gs://genetics-portal-dev-data/22.09.0/outputs/v2d_credset" | |
) | |
v2d = spark.read.parquet("gs://genetics-portal-dev-data/22.09.0/outputs/v2d") | |
pics = spark.read.parquet("gs://genetics-portal-dev-staging/v2d/220401/ld.parquet") | |
def create_study_id_hash( | |
df: DataFrame, study_col: str, phenotype_col: str, biofeature_col: str | |
) -> DataFrame: | |
"""Create a function for hashing study IDs. | |
GWAS studies are not hashed, as they are already unique. | |
For QTLs, a hashed study id uniquely identifies the actual study, the measured trait and the biofeature where it is observed. | |
""" | |
return df.withColumn( | |
"studyId", | |
f.when( | |
f.col("type") != "gwas", | |
f.xxhash64(*["type", study_col, phenotype_col, biofeature_col]), | |
).otherwise(f.col(study_col)), | |
) | |
v2d_credset_t = ( | |
v2d_credset.transform( | |
lambda df: create_study_id_hash(df, "study_id", "phenotype_id", "bio_feature") | |
) | |
# Transformation of FM credset to newer schema | |
.select( | |
f.concat_ws( | |
"_", | |
f.col("lead_chrom"), | |
f.col("lead_pos"), | |
f.col("lead_ref"), | |
f.col("lead_alt"), | |
).alias("leadVariantId"), | |
"studyId", | |
f.col("is95_credset").alias("is95CredibleSet"), | |
f.col("is99_credset").alias("is99CredibleSet"), | |
"logABF", | |
f.col("postprob").alias("posteriorProbability"), | |
f.concat_ws( | |
"_", | |
f.col("tag_chrom"), | |
f.col("tag_pos"), | |
f.col("tag_ref"), | |
f.col("tag_alt"), | |
).alias("tagVariantId"), | |
f.col("tag_pval").alias("tagPValue"), | |
f.col("tag_pval_cond").alias("tagPValueConditioned"), | |
f.col("tag_beta").alias("tagBeta"), | |
f.col("tag_se").alias("tagStandardError"), | |
f.col("tag_beta_cond").alias("tagBetaConditioned"), | |
f.col("tag_se_cond").alias("tagStandardErrorConditioned"), | |
f.col("multisignal_method").alias("method"), | |
) | |
) | |
pics_t = ( | |
# Transformation of LD credset to newer schema | |
pics.select( | |
f.concat_ws( | |
"_", | |
f.col("lead_chrom"), | |
f.col("lead_pos"), | |
f.col("lead_ref"), | |
f.col("lead_alt"), | |
).alias("leadVariantId"), | |
f.col("study_id").alias("studyId"), | |
f.col("pics_95perc_credset").alias("is95CredibleSet"), | |
f.col("pics_99perc_credset").alias("is99CredibleSet"), | |
f.concat_ws( | |
"_", | |
f.col("tag_chrom"), | |
f.col("tag_pos"), | |
f.col("tag_ref"), | |
f.col("tag_alt"), | |
).alias("tagVariantId"), | |
f.col("pics_postprob").alias("posteriorProbability"), | |
f.col("overall_r2").alias("r2Overall"), | |
f.lit("pics").alias("method"), | |
) | |
) | |
# Join credsets | |
credsets = v2d_credset_t.unionByName(pics_t, allowMissingColumns=True).distinct() | |
qtl_assocs = ( | |
v2d_credset.filter(f.col("type") != "gwas") | |
.select( | |
f.concat_ws( | |
"_", | |
f.col("lead_chrom"), | |
f.col("lead_pos"), | |
f.col("lead_ref"), | |
f.col("lead_alt"), | |
).alias("variantId"), | |
f.col("study_id").alias("source"), | |
f.col("phenotype_id").alias("traitFromSource"), | |
# f.regexp_extract("phenotype_id", r"^(ENSG\d+|ILMN_\d+)", 1).alias("traitFromSourceMappedId"), this should be an EFO | |
f.col("bio_feature").alias("biofeature"), | |
f.when( | |
f.col("bio_feature").startswith("UBERON_"), | |
f.regexp_extract("bio_feature", r"^(UBERON_\d+)", 1), | |
).alias("biofeatureId"), | |
"type", | |
) | |
.transform( | |
lambda df: create_study_id_hash(df, "source", "traitFromSource", "biofeature") | |
) | |
.select( | |
"variantId", | |
"studyId", | |
f.lit(0.0).alias("beta"), | |
f.lit(0.0).alias("confidenceIntervalLower"), | |
f.lit(0.0).alias("confidenceIntervalUpper"), | |
f.lit(0.0).alias("pValue"), | |
f.lit(0.0).alias("pValueMantissa"), | |
f.lit(0).alias("pValueExponent"), | |
) | |
) | |
gwas_associations = ( | |
# Transformation of V2D to newer associations schema | |
v2d.withColumn( | |
"confidenceIntervalLower", | |
f.when(f.col("beta_ci_lower").isNotNull(), f.col("beta_ci_lower")).when( | |
f.col("oddsr_ci_lower").isNotNull(), f.col("oddsr_ci_lower") | |
), | |
) | |
.withColumn( | |
"confidenceIntervalUpper", | |
f.when(f.col("beta_ci_upper").isNotNull(), f.col("beta_ci_upper")).when( | |
f.col("oddsr_ci_upper").isNotNull(), f.col("oddsr_ci_upper") | |
), | |
) | |
.select( | |
f.concat_ws( | |
"_", | |
f.col("lead_chrom"), | |
f.col("lead_pos"), | |
f.col("lead_ref"), | |
f.col("lead_alt"), | |
).alias("variantId"), | |
f.col("study_id").alias("studyId"), | |
"beta", | |
f.col("odds_ratio").alias("oddsRatio"), | |
"confidenceIntervalLower", | |
"confidenceIntervalUpper", | |
f.col("pval").alias("pValue"), | |
f.col("pval_mantissa").alias("pValueMantissa"), | |
f.col("pval_exponent").alias("pValueExponent"), | |
) | |
) | |
# Bring together GWAS and QTL associations | |
associations = gwas_associations.unionByName(qtl_assocs, allowMissingColumns=True).distinct() | |
struct_cols = credsets.drop("leadVariantId", "studyId", "method").columns | |
credsets_t = ( | |
# Transformation of credsets to join with associations | |
credsets.select( | |
"leadVariantId", "studyId", "method", f.struct(struct_cols).alias("credibleSet") | |
) | |
# We need to make 2 aggregations, first to get the credible sets per variant/study/method | |
# and then to get the credible sets per variant/study | |
.groupBy("leadVariantId", "studyId", "method") | |
.agg(f.collect_set("credibleSet").alias("credibleSet")) | |
.groupBy("leadVariantId", "studyId") | |
.agg( | |
f.collect_set( | |
f.struct( | |
f.col("method"), | |
f.col("credibleSet"), | |
).alias("credibleSets") | |
).alias("credibleSets") | |
) | |
.distinct() | |
) | |
study_locus = ( | |
# Transformation to join associations with credsets by joining on leadVariantId and studyId | |
associations.join( | |
credsets_t.withColumnRenamed("leadVariantId", "variantId"), | |
on=["variantId", "studyId"], | |
how="left", | |
) | |
) | |
""" | |
Output location: gs://ot-team/irene/study_locus_v3 | |
Output schema | |
root | |
|-- variantId: string (nullable = false) | |
|-- studyId: string (nullable = true) | |
|-- beta: double (nullable = true) | |
|-- oddsRatio: double (nullable = true) | |
|-- confidenceIntervalLower: double (nullable = true) | |
|-- confidenceIntervalUpper: double (nullable = true) | |
|-- pValue: double (nullable = true) | |
|-- pValueMantissa: double (nullable = true) | |
|-- pValueExponent: long (nullable = true) | |
|-- credibleSets: array (nullable = true) | |
| |-- element: struct (containsNull = false) | |
| | |-- method: string (nullable = true) | |
| | |-- credibleSet: array (nullable = false) | |
| | | |-- element: struct (containsNull = false) | |
| | | | |-- is95CredibleSet: boolean (nullable = true) | |
| | | | |-- is99CredibleSet: boolean (nullable = true) | |
| | | | |-- logABF: double (nullable = true) | |
| | | | |-- posteriorProbability: double (nullable = true) | |
| | | | |-- tagVariantId: string (nullable = false) | |
| | | | |-- tagPValue: double (nullable = true) | |
| | | | |-- tagPValueConditioned: double (nullable = true) | |
| | | | |-- tagBeta: double (nullable = true) | |
| | | | |-- tagStandardError: double (nullable = true) | |
| | | | |-- tagBetaConditioned: double (nullable = true) | |
| | | | |-- tagStandardErrorConditioned: double (nullable = true) | |
| | | | |-- r2Overall: double (nullable = true) | |
""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment