Last active
July 3, 2023 13:33
-
-
Save DSuveges/9dc48538314e1dbe54b78cdb635baa14 to your computer and use it in GitHub Desktop.
GCS/New windowing.ipynb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"start_time": "2023-07-03T12:13:02.745853Z", | |
"end_time": "2023-07-03T12:13:08.150852Z" | |
}, | |
"trusted": true, | |
"scrolled": false | |
}, | |
"id": "edc19b64", | |
"cell_type": "code", | |
"source": "from pyspark.sql import SparkSession, functions as f, types as t, DataFrame, Column\nfrom pyspark.sql.window import Window\n\nimport sys\nfrom typing import List, Callable\nimport pandas as pd\nfrom copy import deepcopy\n\nspark = SparkSession.builder.getOrCreate()\n\n# Input data:\nsummary_stats = f'gs://open-targets-gwas-summary-stats/studies'\npv_threshold = 5e-8\ndistance_threshold = 2.5e5\n", | |
"execution_count": 4, | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"text": "23/07/03 12:13:02 WARN SparkContext: Another SparkContext is being constructed (or threw an exception in its constructor). This may indicate an error, since only one SparkContext should be running in this JVM (see SPARK-2243). The other SparkContext was created at:\norg.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)\njava.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)\njava.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)\njava.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)\njava.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)\npy4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)\npy4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\npy4j.Gateway.invoke(Gateway.java:238)\npy4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)\npy4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)\npy4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\npy4j.ClientServerConnection.run(ClientServerConnection.java:106)\njava.base/java.lang.Thread.run(Thread.java:829)\n23/07/03 12:13:02 INFO SparkEnv: Registering MapOutputTracker\n23/07/03 12:13:02 INFO SparkEnv: Registering BlockManagerMaster\n23/07/03 12:13:02 INFO SparkEnv: Registering BlockManagerMasterHeartbeat\n23/07/03 12:13:02 INFO SparkEnv: Registering OutputCommitCoordinator\n", | |
"name": "stderr" | |
} | |
] | |
}, | |
{ | |
"metadata": {}, | |
"id": "2374cfe5", | |
"cell_type": "markdown", | |
"source": "### Helper functions" | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"start_time": "2023-07-03T12:13:43.557369Z", | |
"end_time": "2023-07-03T12:13:43.564780Z" | |
}, | |
"trusted": true | |
}, | |
"id": "cbe02f4f", | |
"cell_type": "code", | |
"source": "from typing import List\n\ndef parse_pvalue(pv: Column) -> List[Column]:\n \"\"\"This function takes a p-value string and returns two columns mantissa (float), exponent (integer).\n\n Args:\n pv (Column): P-value as string\n\n Returns:\n Column: p-value mantissa (float)\n Column: p-value exponent (integer)\n\n Examples:\n >>> d = [(\"0.01\",),(\"4.2E-45\",),(\"43.2E5\",),(\"0\",),(\"1\",)]\n >>> spark.createDataFrame(d, ['pval']).select('pval',*parse_pvalue(f.col('pval'))).show()\n +-------+--------------+--------------+\n | pval|pValueExponent|pValueMantissa|\n +-------+--------------+--------------+\n | 0.01| -2| 1.0|\n |4.2E-45| -45| 4.2|\n | 43.2E5| 5| 43.2|\n | 0| -308| 2.225|\n | 1| 0| 1.0|\n +-------+--------------+--------------+\n <BLANKLINE> \n \"\"\"\n # Making sure there's a number in the string:\n pv = f.when(pv == \"0\", f.lit(sys.float_info.min).cast(t.StringType())).otherwise(pv)\n \n # Get exponent:\n exponent = f.when(\n f.upper(pv).contains('E'),\n f.split(f.upper(pv), 'E').getItem(1).cast(t.IntegerType())\n ).otherwise(\n f.log10(pv).cast(t.IntegerType())\n )\n\n # Get mantissa:\n mantissa = f.when(\n f.upper(pv).contains('E'),\n f.split(f.upper(pv), 'E').getItem(0).cast(t.FloatType())\n ).otherwise(\n pv / (10 ** exponent)\n )\n \n # Round value:\n mantissa = f.round(mantissa, 3)\n \n return [\n mantissa.alias('pValueMantissa'),\n exponent.alias('pValueExponent'), \n ]\n\ndef calculate_neglog_pvalue(\n p_value_mantissa: Column, p_value_exponent: Column\n) -> Column:\n \"\"\"Compute the negative log p-value.\n\n Args:\n p_value_mantissa (Column): P-value mantissa\n p_value_exponent (Column): P-value exponent\n\n Returns:\n Column: Negative log p-value\n\n Examples:\n >>> d = [(1, 1), (5, -2), (1, -1000)]\n >>> df = spark.createDataFrame(d).toDF(\"p_value_mantissa\", \"p_value_exponent\")\n >>> df.withColumn(\"neg_log_p\", calculate_neglog_pvalue(f.col(\"p_value_mantissa\"), f.col(\"p_value_exponent\"))).show()\n +----------------+----------------+------------------+\n |p_value_mantissa|p_value_exponent| neg_log_p|\n +----------------+----------------+------------------+\n | 1| 1| -1.0|\n | 5| -2|1.3010299956639813|\n | 1| -1000| 1000.0|\n +----------------+----------------+------------------+\n <BLANKLINE>\n \"\"\"\n return f.round(-1 * (f.log10(p_value_mantissa) + p_value_exponent), 3)\n", | |
"execution_count": 6, | |
"outputs": [] | |
}, | |
{ | |
"metadata": {}, | |
"id": "f988b33b", | |
"cell_type": "markdown", | |
"source": "## Groupby based process" | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"end_time": "2023-07-01T00:24:38.373176Z", | |
"start_time": "2023-06-30T23:14:10.872754Z" | |
}, | |
"trusted": false | |
}, | |
"id": "a06e4be0", | |
"cell_type": "code", | |
"source": "# Reading stummary statistics from two studies:\nsumstats = (\n spark.read.parquet(summary_stats, recursiveFileLookup=True)\n .filter(\n # Filter for snps with position available:\n f.col('position').isNotNull() & \n # Filter snps for GWAS significance:\n (f.col('pValue').cast(t.DoubleType()) <= pv_threshold)\n )\n .select(\n 'studyId', \n 'variantId', \n 'chromosome', \n 'position', \n 'rsId', \n 'pValue', \n *parse_pvalue(f.col('pValue')),\n calculate_neglog_pvalue(*parse_pvalue(f.col('pValue'))).alias('negLogPValue')\n )\n)\n\n# Inferring the output schema based on the input schema...\nreturn_schema = deepcopy(sumstats.schema)\nreturn_schema.add(t.StructField('isLead', t.BooleanType(), False))\n\ndef find_peak_w_window(window_size: int):\n def find_peak(_df: pd.DataFrame) -> pd.DataFrame:\n # sorting rows by p-value:\n _df['isLead'] = None\n for index in _df.sort_values(['negLogPValue'], ascending=False).index:\n\n # We have found a top snp!\n if _df.loc[index, 'isLead'] is None:\n # Setting false flag for ALL variants within the window.\n _df.loc[_df[abs(_df.position - _df.loc[index, 'position']) <= window_size].index,'isLead'] = False\n _df.loc[index, 'isLead'] = True\n\n return _df\n\n return find_peak\n\nx = (\n sumstats\n # Grooping by study/chromosome:\n .groupBy(['studyId', 'chromosome'])\n # For each study/chromosome chunk, peaks are identified:\n .applyInPandas(find_peak_w_window(distance_threshold), schema=return_schema)\n .persist()\n)\n\nx.filter(f.col('isLead')).show(1, False, True)\nx.write.mode('overwrite').parquet('gs://ot-team/dsuveges/new_sumstat_window')\n\n", | |
"execution_count": 8, | |
"outputs": [ | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": " \r" | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "-RECORD 0------------------------\n studyId | GCST004625 \n variantId | 7_2831404_C_CT \n chromosome | 7 \n position | 2831404 \n rsId | rs3996402 \n pValue | 3.459e-09 \n pValueMantissa | 3.459 \n pValueExponent | -9 \n negLogPValue | 8.461 \n isLead | true \nonly showing top 1 row\n\n" | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": " \r" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"end_time": "2023-07-01T06:32:45.755818Z", | |
"start_time": "2023-07-01T06:32:43.375516Z" | |
}, | |
"trusted": false | |
}, | |
"id": "5a3246de", | |
"cell_type": "code", | |
"source": "x.filter(f.col('isLead')).select('variantId').distinct().count()", | |
"execution_count": 11, | |
"outputs": [ | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": " \r" | |
}, | |
{ | |
"data": { | |
"text/plain": "22785" | |
}, | |
"execution_count": 11, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": false | |
}, | |
"id": "f105ce63", | |
"cell_type": "code", | |
"source": "2.1M significant -> 28k semiindices -> 22k unique snp", | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"metadata": {}, | |
"id": "356417f3", | |
"cell_type": "markdown", | |
"source": "## Windowing based process\n\n- " | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"start_time": "2023-07-01T21:54:47.643Z" | |
}, | |
"trusted": false | |
}, | |
"id": "36325b0a", | |
"cell_type": "code", | |
"source": "import pandas as pd\n\n# Reading stummary statistics from two studies:\nsumstats = (\n spark.read.parquet(summary_stats, recursiveFileLookup=True)\n .filter(\n # Filter for snps with position available:\n f.col('position').isNotNull() & \n # Filter snps for GWAS significance:\n (f.col('pValue').cast(t.DoubleType()) <= pv_threshold)\n )\n .select(\n 'studyId', \n 'variantId', \n 'chromosome', \n 'position', \n 'rsId', \n 'pValue', \n *parse_pvalue(f.col('pValue')),\n calculate_neglog_pvalue(*parse_pvalue(f.col('pValue'))).alias('negLogPValue')\n )\n)\n\n\ndef find_peak_w_window(window_size: int):\n\n @f.pandas_udf(t.ArrayType(t.BooleanType()))\n def find_peak(neglog: pd.Series, position: pd.Series) -> list():\n\n is_lead = [None] * len(neglog)\n for index in range(len(position)):\n if is_lead[index] is None:\n # A new lead has been found:\n is_lead[index] = True\n\n # Setting flags to False within the window:\n for new_index in range(index+1, len(position)):\n # This position is within the window:\n if abs(position[new_index] - position[index]) < window_size:\n is_lead[new_index] = False\n\n return is_lead\n \n return find_peak\n\nx = (\n sumstats\n .withColumn(\n 'isLead', \n find_peak_w_window(2.5e5)(\n f.col('negLogPValue'), \n f.col('position'), \n ).over(\n Window\n .partitionBy('studyId', 'chromosome')\n .orderBy(f.col('negLogPValue').desc())\n .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)\n )\n )\n .withColumn('rank', f.row_number().over(\n Window\n .partitionBy('studyId', 'chromosome')\n .orderBy(f.col('negLogPValue').desc())\n )\n )\n .withColumn(\n 'isLead', f.col('isLead').getItem(f.col('rank')-1) \n )\n .persist()\n)\n\n\n\n\nx.filter(f.col('isLead')).show(1, False, True)\nx.write.mode('overwrite').parquet('gs://ot-team/dsuveges/new_sumstat_window_v2')\n", | |
"execution_count": null, | |
"outputs": [ | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": "[Stage 24:==============================================> (7417 + 16) / 8184]\r" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"end_time": "2023-07-01T21:20:54.511243Z", | |
"start_time": "2023-07-01T20:49:24.721633Z" | |
}, | |
"trusted": false | |
}, | |
"id": "1419e031", | |
"cell_type": "code", | |
"source": "(\n spark.read.parquet(summary_stats, recursiveFileLookup=True)\n .filter(\n # Filter for snps with position available:\n f.col('position').isNotNull() & \n # Filter snps for GWAS significance:\n (f.col('pValue').cast(t.DoubleType()) <= pv_threshold)\n )\n .groupBy('studyId')\n .count()\n .orderBy(f.col('count').desc())\n .show(truncate=False)\n)", | |
"execution_count": 5, | |
"outputs": [ | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": "[Stage 8:====================================================>(8183 + 1) / 8184]\r" | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "+------------+------+\n|studyId |count |\n+------------+------+\n|GCST90002392|160865|\n|GCST90002357|160769|\n|GCST90002346|157005|\n|GCST90002402|144031|\n|GCST90018969|109482|\n|GCST90013976|95964 |\n|GCST90014007|74677 |\n|GCST90026654|74048 |\n|GCST90029010|56346 |\n|GCST004625 |41724 |\n|GCST004603 |38894 |\n|GCST90012113|37654 |\n|GCST004614 |33182 |\n|GCST90013999|32620 |\n|GCST010703 |31611 |\n|GCST90012114|30282 |\n|GCST90043619|26925 |\n|GCST90038596|19912 |\n|GCST004988 |19531 |\n|GCST90018627|18808 |\n+------------+------+\nonly showing top 20 rows\n\n" | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": "\r \r" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"end_time": "2023-07-02T06:19:42.702027Z", | |
"start_time": "2023-07-02T06:19:39.493892Z" | |
}, | |
"trusted": false | |
}, | |
"id": "df45e7ab", | |
"cell_type": "code", | |
"source": "print(x.filter(f.col('isLead')).count())\nprint(x.filter(f.col('isLead')).select('variantId').distinct().count())", | |
"execution_count": 12, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "28360\n" | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": "[Stage 39:=================================================> (184 + 8) / 200]\r" | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "22678\n" | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": "\r \r" | |
} | |
] | |
}, | |
{ | |
"metadata": {}, | |
"id": "0a15d082", | |
"cell_type": "markdown", | |
"source": "### Get example dataset:" | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"end_time": "2023-07-01T21:47:53.625971Z", | |
"start_time": "2023-07-01T21:47:37.009526Z" | |
}, | |
"trusted": false | |
}, | |
"id": "227c67e7", | |
"cell_type": "code", | |
"source": "study_id = 'GCST90002392'\n\n\n(\n spark.read.parquet(f'{summary_stats}/{study_id}', recursiveFileLookup=True)\n .filter(\n # Filter for snps with position available:\n f.col('position').isNotNull() & \n # Filter snps for GWAS significance:\n (f.col('pValue').cast(t.DoubleType()) <= pv_threshold)\n )\n .write.mode('overwrite').parquet(f'gs://ot-team/dsuveges/{study_id}_significant')\n)", | |
"execution_count": 8, | |
"outputs": [ | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": " \r" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"end_time": "2023-07-01T21:52:31.328546Z", | |
"start_time": "2023-07-01T21:51:02.284207Z" | |
}, | |
"trusted": false | |
}, | |
"id": "6bcc0689", | |
"cell_type": "code", | |
"source": "study_id = 'GCST90002392'\n\n# Reading stummary statistics from two studies:\nsumstats = (\n spark.read.parquet(f'gs://ot-team/dsuveges/{study_id}_significant', recursiveFileLookup=True)\n .select(\n 'studyId', \n 'variantId', \n 'chromosome', \n 'position', \n 'rsId', \n 'pValue', \n *parse_pvalue(f.col('pValue')),\n calculate_neglog_pvalue(*parse_pvalue(f.col('pValue'))).alias('negLogPValue')\n )\n)\n\n\ndef find_peak_w_window(window_size: int):\n\n @f.pandas_udf(t.ArrayType(t.BooleanType()))\n def find_peak(neglog: pd.Series, position: pd.Series) -> list():\n\n is_lead = [None] * len(neglog)\n for index in range(len(position)):\n if is_lead[index] is None:\n # A new lead has been found:\n is_lead[index] = True\n\n # Setting flags to False within the window:\n for new_index in range(index+1, len(position)):\n # This position is within the window:\n if abs(position[new_index] - position[index]) < window_size:\n is_lead[new_index] = False\n\n return is_lead\n \n return find_peak\n\nx = (\n sumstats\n .withColumn(\n 'isLead', \n find_peak_w_window(2.5e5)(\n f.col('negLogPValue'), \n f.col('position'), \n ).over(\n Window\n .partitionBy('studyId', 'chromosome')\n .orderBy(f.col('negLogPValue').desc())\n .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)\n )\n )\n .withColumn('rank', f.row_number().over(\n Window\n .partitionBy('studyId', 'chromosome')\n .orderBy(f.col('negLogPValue').desc())\n )\n )\n .withColumn(\n 'isLead', f.col('isLead').getItem(f.col('rank')-1) \n )\n .persist()\n)\n\n\n\n\nx.filter(f.col('isLead')).show(1, False, True)\nx.write.mode('overwrite').parquet('gs://ot-team/dsuveges/new_sumstat_window_v2')\n", | |
"execution_count": 9, | |
"outputs": [ | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": "/usr/lib/spark/python/pyspark/sql/column.py:419: FutureWarning: A column as 'key' in getItem is deprecated as of Spark 3.0, and will not be supported in the future release. Use `column[key]` or `column.key` syntax instead.\n warnings.warn(\n \r" | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": "-RECORD 0------------------------\n studyId | GCST90002392 \n variantId | 6_25842723_T_G \n chromosome | 6 \n position | 25842723 \n rsId | rs1408272 \n pValue | 4.6E-987 \n pValueMantissa | 4.6 \n pValueExponent | -987 \n negLogPValue | 986.337 \n isLead | true \n rank | 1 \nonly showing top 1 row\n\n" | |
}, | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": " \r" | |
} | |
] | |
}, | |
{ | |
"metadata": {}, | |
"id": "ecf3e278", | |
"cell_type": "markdown", | |
"source": "### Optimized windowing" | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"start_time": "2023-07-03T13:10:59.681314Z", | |
"end_time": "2023-07-03T13:14:14.368788Z" | |
}, | |
"scrolled": false, | |
"trusted": true | |
}, | |
"id": "11969961", | |
"cell_type": "code", | |
"source": "sumstats = (\n # Reading all summary statistics:\n spark.read.parquet(\n *[\n f'gs://open-targets-gwas-summary-stats/studies/GCST90002392',\n f'gs://open-targets-gwas-summary-stats/studies/GCST90018969',\n f'gs://open-targets-gwas-summary-stats/studies/GCST90013976',\n f'gs://open-targets-gwas-summary-stats/studies/GCST90002346',\n ]\n , recursiveFileLookup=True)\n .filter(\n # Filter for snps with position available:\n f.col('position').isNotNull() & \n # Filter snps for GWAS significance:\n (f.col('pValue').cast(t.DoubleType()) <= pv_threshold)\n )\n # Parser p-values provided as text:\n .select('*', *parse_pvalue(f.col('pValue')))\n # Select relevant columns + calculate neg-log p-value:\n .select(\n 'studyId', \n 'variantId', \n 'chromosome', \n 'position', \n 'rsId', \n 'pValue', \n 'pValueMantissa',\n 'pValueExponent',\n calculate_neglog_pvalue(f.col('pValueMantissa'), f.col('pValueExponent')).alias('negLogPValue')\n )\n)\n\n\ndef find_peak_w_window(window_size: int) -> Callable:\n\n @f.pandas_udf(t.ArrayType(t.BooleanType()))\n def find_peak(position: pd.Series) -> list():\n # List to store boolean flags indicating leads:\n is_lead = []\n\n # List containing indices of leads:\n lead_indices = []\n\n # Looping through all positions:\n for index in range(len(position)):\n # Looping through leads to find out if they are within a window:\n for lead_index in lead_indices:\n # If any of the leads within the window:\n if abs(position[lead_index] - position[index]) < window_size:\n # Setting the lead flag to false:\n is_lead.append(False)\n # Skipping further checks:\n break\n else:\n # None of the leads were within the window:\n lead_indices.append(index)\n is_lead.append(True)\n\n return is_lead\n \n return find_peak\n\nx = (\n sumstats\n .withColumn(\n 'isLead', \n find_peak_w_window(2.5e5)(f.col('position')).over(\n Window\n .partitionBy('studyId', 'chromosome')\n .orderBy(f.col('negLogPValue').desc())\n .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)\n )\n )\n .withColumn('rank', f.row_number().over(\n Window\n .partitionBy('studyId', 'chromosome')\n .orderBy(f.col('negLogPValue').desc())\n )\n )\n .withColumn(\n 'isLead', f.col('isLead').getItem(f.col('rank')-1) \n )\n .persist()\n)\n\n\n\n\nx.filter(f.col('isLead')).show(1, False, True)\nx.write.mode('overwrite').parquet('gs://ot-team/dsuveges/new_sumstat_window_v2')\nprint(x.filter(f.col('isLead')).count())\nprint(x.count())", | |
"execution_count": 10, | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"text": " \r", | |
"name": "stderr" | |
}, | |
{ | |
"output_type": "stream", | |
"text": "-RECORD 0-------------------------\n studyId | GCST90002346 \n variantId | 1_205268009_T_C \n chromosome | 1 \n position | 205268009 \n rsId | rs1668871 \n pValue | 4.55E-723 \n pValueMantissa | 4.55 \n pValueExponent | -723 \n negLogPValue | 722.342 \n isLead | true \n rank | 1 \nonly showing top 1 row\n\n", | |
"name": "stdout" | |
}, | |
{ | |
"output_type": "stream", | |
"text": " \r", | |
"name": "stderr" | |
}, | |
{ | |
"output_type": "stream", | |
"text": "4154\n523316\n", | |
"name": "stdout" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": false | |
}, | |
"id": "dac85a6d", | |
"cell_type": "code", | |
"source": "study_id = 'GCST90002392'\n\n# Reading stummary statistics from two studies:\nsumstats = (\n spark.read.parquet(f'gs://ot-team/dsuveges/{study_id}_significant', recursiveFileLookup=True)\n .select(\n 'studyId', \n 'variantId', \n 'chromosome', \n 'position', \n 'rsId', \n 'pValue', \n *parse_pvalue(f.col('pValue')),\n calculate_neglog_pvalue(*parse_pvalue(f.col('pValue'))).alias('negLogPValue')\n )\n)\n\n\ndef find_peak_w_window(window_size: int):\n\n @f.pandas_udf(t.ArrayType(t.BooleanType()))\n def find_peak(neglog: pd.Series, position: pd.Series) -> list():\n\n is_lead = [None] * len(neglog)\n for index in range(len(position)):\n if is_lead[index] is None:\n # A new lead has been found:\n is_lead[index] = True\n\n # Setting flags to False within the window:\n for new_index in range(index+1, len(position)):\n # This position is within the window:\n if abs(position[new_index] - position[index]) < window_size:\n is_lead[new_index] = False\n\n return is_lead\n \n return find_peak\n\nx = (\n sumstats\n .withColumn(\n 'isLead', \n find_peak_w_window(2.5e5)(\n f.col('negLogPValue'), \n f.col('position'), \n ).over(\n Window\n .partitionBy('studyId', 'chromosome')\n .orderBy(f.col('negLogPValue').desc())\n .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)\n )\n )\n .withColumn('rank', f.row_number().over(\n Window\n .partitionBy('studyId', 'chromosome')\n .orderBy(f.col('negLogPValue').desc())\n )\n )\n .withColumn(\n 'isLead', f.col('isLead').getItem(f.col('rank')-1) \n )\n .persist()\n)\n\n\n\n\nx.filter(f.col('isLead')).show(1, False, True)\nx.write.mode('overwrite').parquet('gs://ot-team/dsuveges/new_sumstat_window_v2')\n", | |
"execution_count": null, | |
"outputs": [] | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"start_time": "2023-07-03T13:04:32.259318Z", | |
"end_time": "2023-07-03T13:04:32.660651Z" | |
}, | |
"trusted": true | |
}, | |
"cell_type": "code", | |
"source": "print(x.count())", | |
"execution_count": 8, | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"text": "2111639\n", | |
"name": "stdout" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true | |
}, | |
"cell_type": "code", | |
"source": "", | |
"execution_count": null, | |
"outputs": [] | |
} | |
], | |
"metadata": { | |
"_draft": { | |
"nbviewer_url": "https://gist.github.com/DSuveges/9dc48538314e1dbe54b78cdb635baa14" | |
}, | |
"gist": { | |
"id": "9dc48538314e1dbe54b78cdb635baa14", | |
"data": { | |
"description": "GCS/New windowing.ipynb", | |
"public": true | |
} | |
}, | |
"kernelspec": { | |
"name": "python3", | |
"display_name": "Python 3", | |
"language": "python" | |
}, | |
"language_info": { | |
"name": "python", | |
"version": "3.10.8", | |
"mimetype": "text/x-python", | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"pygments_lexer": "ipython3", | |
"nbconvert_exporter": "python", | |
"file_extension": ".py" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 5 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment