Skip to content

Instantly share code, notes, and snippets.

@DSuveges
Last active July 3, 2023 13:33
Show Gist options
  • Save DSuveges/9dc48538314e1dbe54b78cdb635baa14 to your computer and use it in GitHub Desktop.
Save DSuveges/9dc48538314e1dbe54b78cdb635baa14 to your computer and use it in GitHub Desktop.
GCS/New windowing.ipynb
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"metadata": {
"ExecuteTime": {
"start_time": "2023-07-03T12:13:02.745853Z",
"end_time": "2023-07-03T12:13:08.150852Z"
},
"trusted": true,
"scrolled": false
},
"id": "edc19b64",
"cell_type": "code",
"source": "from pyspark.sql import SparkSession, functions as f, types as t, DataFrame, Column\nfrom pyspark.sql.window import Window\n\nimport sys\nfrom typing import List, Callable\nimport pandas as pd\nfrom copy import deepcopy\n\nspark = SparkSession.builder.getOrCreate()\n\n# Input data:\nsummary_stats = f'gs://open-targets-gwas-summary-stats/studies'\npv_threshold = 5e-8\ndistance_threshold = 2.5e5\n",
"execution_count": 4,
"outputs": [
{
"output_type": "stream",
"text": "23/07/03 12:13:02 WARN SparkContext: Another SparkContext is being constructed (or threw an exception in its constructor). This may indicate an error, since only one SparkContext should be running in this JVM (see SPARK-2243). The other SparkContext was created at:\norg.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)\njava.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)\njava.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)\njava.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)\njava.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)\npy4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)\npy4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)\npy4j.Gateway.invoke(Gateway.java:238)\npy4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)\npy4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)\npy4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)\npy4j.ClientServerConnection.run(ClientServerConnection.java:106)\njava.base/java.lang.Thread.run(Thread.java:829)\n23/07/03 12:13:02 INFO SparkEnv: Registering MapOutputTracker\n23/07/03 12:13:02 INFO SparkEnv: Registering BlockManagerMaster\n23/07/03 12:13:02 INFO SparkEnv: Registering BlockManagerMasterHeartbeat\n23/07/03 12:13:02 INFO SparkEnv: Registering OutputCommitCoordinator\n",
"name": "stderr"
}
]
},
{
"metadata": {},
"id": "2374cfe5",
"cell_type": "markdown",
"source": "### Helper functions"
},
{
"metadata": {
"ExecuteTime": {
"start_time": "2023-07-03T12:13:43.557369Z",
"end_time": "2023-07-03T12:13:43.564780Z"
},
"trusted": true
},
"id": "cbe02f4f",
"cell_type": "code",
"source": "from typing import List\n\ndef parse_pvalue(pv: Column) -> List[Column]:\n \"\"\"This function takes a p-value string and returns two columns mantissa (float), exponent (integer).\n\n Args:\n pv (Column): P-value as string\n\n Returns:\n Column: p-value mantissa (float)\n Column: p-value exponent (integer)\n\n Examples:\n >>> d = [(\"0.01\",),(\"4.2E-45\",),(\"43.2E5\",),(\"0\",),(\"1\",)]\n >>> spark.createDataFrame(d, ['pval']).select('pval',*parse_pvalue(f.col('pval'))).show()\n +-------+--------------+--------------+\n | pval|pValueExponent|pValueMantissa|\n +-------+--------------+--------------+\n | 0.01| -2| 1.0|\n |4.2E-45| -45| 4.2|\n | 43.2E5| 5| 43.2|\n | 0| -308| 2.225|\n | 1| 0| 1.0|\n +-------+--------------+--------------+\n <BLANKLINE> \n \"\"\"\n # Making sure there's a number in the string:\n pv = f.when(pv == \"0\", f.lit(sys.float_info.min).cast(t.StringType())).otherwise(pv)\n \n # Get exponent:\n exponent = f.when(\n f.upper(pv).contains('E'),\n f.split(f.upper(pv), 'E').getItem(1).cast(t.IntegerType())\n ).otherwise(\n f.log10(pv).cast(t.IntegerType())\n )\n\n # Get mantissa:\n mantissa = f.when(\n f.upper(pv).contains('E'),\n f.split(f.upper(pv), 'E').getItem(0).cast(t.FloatType())\n ).otherwise(\n pv / (10 ** exponent)\n )\n \n # Round value:\n mantissa = f.round(mantissa, 3)\n \n return [\n mantissa.alias('pValueMantissa'),\n exponent.alias('pValueExponent'), \n ]\n\ndef calculate_neglog_pvalue(\n p_value_mantissa: Column, p_value_exponent: Column\n) -> Column:\n \"\"\"Compute the negative log p-value.\n\n Args:\n p_value_mantissa (Column): P-value mantissa\n p_value_exponent (Column): P-value exponent\n\n Returns:\n Column: Negative log p-value\n\n Examples:\n >>> d = [(1, 1), (5, -2), (1, -1000)]\n >>> df = spark.createDataFrame(d).toDF(\"p_value_mantissa\", \"p_value_exponent\")\n >>> df.withColumn(\"neg_log_p\", calculate_neglog_pvalue(f.col(\"p_value_mantissa\"), f.col(\"p_value_exponent\"))).show()\n +----------------+----------------+------------------+\n |p_value_mantissa|p_value_exponent| neg_log_p|\n +----------------+----------------+------------------+\n | 1| 1| -1.0|\n | 5| -2|1.3010299956639813|\n | 1| -1000| 1000.0|\n +----------------+----------------+------------------+\n <BLANKLINE>\n \"\"\"\n return f.round(-1 * (f.log10(p_value_mantissa) + p_value_exponent), 3)\n",
"execution_count": 6,
"outputs": []
},
{
"metadata": {},
"id": "f988b33b",
"cell_type": "markdown",
"source": "## Groupby based process"
},
{
"metadata": {
"ExecuteTime": {
"end_time": "2023-07-01T00:24:38.373176Z",
"start_time": "2023-06-30T23:14:10.872754Z"
},
"trusted": false
},
"id": "a06e4be0",
"cell_type": "code",
"source": "# Reading stummary statistics from two studies:\nsumstats = (\n spark.read.parquet(summary_stats, recursiveFileLookup=True)\n .filter(\n # Filter for snps with position available:\n f.col('position').isNotNull() & \n # Filter snps for GWAS significance:\n (f.col('pValue').cast(t.DoubleType()) <= pv_threshold)\n )\n .select(\n 'studyId', \n 'variantId', \n 'chromosome', \n 'position', \n 'rsId', \n 'pValue', \n *parse_pvalue(f.col('pValue')),\n calculate_neglog_pvalue(*parse_pvalue(f.col('pValue'))).alias('negLogPValue')\n )\n)\n\n# Inferring the output schema based on the input schema...\nreturn_schema = deepcopy(sumstats.schema)\nreturn_schema.add(t.StructField('isLead', t.BooleanType(), False))\n\ndef find_peak_w_window(window_size: int):\n def find_peak(_df: pd.DataFrame) -> pd.DataFrame:\n # sorting rows by p-value:\n _df['isLead'] = None\n for index in _df.sort_values(['negLogPValue'], ascending=False).index:\n\n # We have found a top snp!\n if _df.loc[index, 'isLead'] is None:\n # Setting false flag for ALL variants within the window.\n _df.loc[_df[abs(_df.position - _df.loc[index, 'position']) <= window_size].index,'isLead'] = False\n _df.loc[index, 'isLead'] = True\n\n return _df\n\n return find_peak\n\nx = (\n sumstats\n # Grooping by study/chromosome:\n .groupBy(['studyId', 'chromosome'])\n # For each study/chromosome chunk, peaks are identified:\n .applyInPandas(find_peak_w_window(distance_threshold), schema=return_schema)\n .persist()\n)\n\nx.filter(f.col('isLead')).show(1, False, True)\nx.write.mode('overwrite').parquet('gs://ot-team/dsuveges/new_sumstat_window')\n\n",
"execution_count": 8,
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": " \r"
},
{
"name": "stdout",
"output_type": "stream",
"text": "-RECORD 0------------------------\n studyId | GCST004625 \n variantId | 7_2831404_C_CT \n chromosome | 7 \n position | 2831404 \n rsId | rs3996402 \n pValue | 3.459e-09 \n pValueMantissa | 3.459 \n pValueExponent | -9 \n negLogPValue | 8.461 \n isLead | true \nonly showing top 1 row\n\n"
},
{
"name": "stderr",
"output_type": "stream",
"text": " \r"
}
]
},
{
"metadata": {
"ExecuteTime": {
"end_time": "2023-07-01T06:32:45.755818Z",
"start_time": "2023-07-01T06:32:43.375516Z"
},
"trusted": false
},
"id": "5a3246de",
"cell_type": "code",
"source": "x.filter(f.col('isLead')).select('variantId').distinct().count()",
"execution_count": 11,
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": " \r"
},
{
"data": {
"text/plain": "22785"
},
"execution_count": 11,
"metadata": {},
"output_type": "execute_result"
}
]
},
{
"metadata": {
"trusted": false
},
"id": "f105ce63",
"cell_type": "code",
"source": "2.1M significant -> 28k semiindices -> 22k unique snp",
"execution_count": null,
"outputs": []
},
{
"metadata": {},
"id": "356417f3",
"cell_type": "markdown",
"source": "## Windowing based process\n\n- "
},
{
"metadata": {
"ExecuteTime": {
"start_time": "2023-07-01T21:54:47.643Z"
},
"trusted": false
},
"id": "36325b0a",
"cell_type": "code",
"source": "import pandas as pd\n\n# Reading stummary statistics from two studies:\nsumstats = (\n spark.read.parquet(summary_stats, recursiveFileLookup=True)\n .filter(\n # Filter for snps with position available:\n f.col('position').isNotNull() & \n # Filter snps for GWAS significance:\n (f.col('pValue').cast(t.DoubleType()) <= pv_threshold)\n )\n .select(\n 'studyId', \n 'variantId', \n 'chromosome', \n 'position', \n 'rsId', \n 'pValue', \n *parse_pvalue(f.col('pValue')),\n calculate_neglog_pvalue(*parse_pvalue(f.col('pValue'))).alias('negLogPValue')\n )\n)\n\n\ndef find_peak_w_window(window_size: int):\n\n @f.pandas_udf(t.ArrayType(t.BooleanType()))\n def find_peak(neglog: pd.Series, position: pd.Series) -> list():\n\n is_lead = [None] * len(neglog)\n for index in range(len(position)):\n if is_lead[index] is None:\n # A new lead has been found:\n is_lead[index] = True\n\n # Setting flags to False within the window:\n for new_index in range(index+1, len(position)):\n # This position is within the window:\n if abs(position[new_index] - position[index]) < window_size:\n is_lead[new_index] = False\n\n return is_lead\n \n return find_peak\n\nx = (\n sumstats\n .withColumn(\n 'isLead', \n find_peak_w_window(2.5e5)(\n f.col('negLogPValue'), \n f.col('position'), \n ).over(\n Window\n .partitionBy('studyId', 'chromosome')\n .orderBy(f.col('negLogPValue').desc())\n .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)\n )\n )\n .withColumn('rank', f.row_number().over(\n Window\n .partitionBy('studyId', 'chromosome')\n .orderBy(f.col('negLogPValue').desc())\n )\n )\n .withColumn(\n 'isLead', f.col('isLead').getItem(f.col('rank')-1) \n )\n .persist()\n)\n\n\n\n\nx.filter(f.col('isLead')).show(1, False, True)\nx.write.mode('overwrite').parquet('gs://ot-team/dsuveges/new_sumstat_window_v2')\n",
"execution_count": null,
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": "[Stage 24:==============================================> (7417 + 16) / 8184]\r"
}
]
},
{
"metadata": {
"ExecuteTime": {
"end_time": "2023-07-01T21:20:54.511243Z",
"start_time": "2023-07-01T20:49:24.721633Z"
},
"trusted": false
},
"id": "1419e031",
"cell_type": "code",
"source": "(\n spark.read.parquet(summary_stats, recursiveFileLookup=True)\n .filter(\n # Filter for snps with position available:\n f.col('position').isNotNull() & \n # Filter snps for GWAS significance:\n (f.col('pValue').cast(t.DoubleType()) <= pv_threshold)\n )\n .groupBy('studyId')\n .count()\n .orderBy(f.col('count').desc())\n .show(truncate=False)\n)",
"execution_count": 5,
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": "[Stage 8:====================================================>(8183 + 1) / 8184]\r"
},
{
"name": "stdout",
"output_type": "stream",
"text": "+------------+------+\n|studyId |count |\n+------------+------+\n|GCST90002392|160865|\n|GCST90002357|160769|\n|GCST90002346|157005|\n|GCST90002402|144031|\n|GCST90018969|109482|\n|GCST90013976|95964 |\n|GCST90014007|74677 |\n|GCST90026654|74048 |\n|GCST90029010|56346 |\n|GCST004625 |41724 |\n|GCST004603 |38894 |\n|GCST90012113|37654 |\n|GCST004614 |33182 |\n|GCST90013999|32620 |\n|GCST010703 |31611 |\n|GCST90012114|30282 |\n|GCST90043619|26925 |\n|GCST90038596|19912 |\n|GCST004988 |19531 |\n|GCST90018627|18808 |\n+------------+------+\nonly showing top 20 rows\n\n"
},
{
"name": "stderr",
"output_type": "stream",
"text": "\r \r"
}
]
},
{
"metadata": {
"ExecuteTime": {
"end_time": "2023-07-02T06:19:42.702027Z",
"start_time": "2023-07-02T06:19:39.493892Z"
},
"trusted": false
},
"id": "df45e7ab",
"cell_type": "code",
"source": "print(x.filter(f.col('isLead')).count())\nprint(x.filter(f.col('isLead')).select('variantId').distinct().count())",
"execution_count": 12,
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": "28360\n"
},
{
"name": "stderr",
"output_type": "stream",
"text": "[Stage 39:=================================================> (184 + 8) / 200]\r"
},
{
"name": "stdout",
"output_type": "stream",
"text": "22678\n"
},
{
"name": "stderr",
"output_type": "stream",
"text": "\r \r"
}
]
},
{
"metadata": {},
"id": "0a15d082",
"cell_type": "markdown",
"source": "### Get example dataset:"
},
{
"metadata": {
"ExecuteTime": {
"end_time": "2023-07-01T21:47:53.625971Z",
"start_time": "2023-07-01T21:47:37.009526Z"
},
"trusted": false
},
"id": "227c67e7",
"cell_type": "code",
"source": "study_id = 'GCST90002392'\n\n\n(\n spark.read.parquet(f'{summary_stats}/{study_id}', recursiveFileLookup=True)\n .filter(\n # Filter for snps with position available:\n f.col('position').isNotNull() & \n # Filter snps for GWAS significance:\n (f.col('pValue').cast(t.DoubleType()) <= pv_threshold)\n )\n .write.mode('overwrite').parquet(f'gs://ot-team/dsuveges/{study_id}_significant')\n)",
"execution_count": 8,
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": " \r"
}
]
},
{
"metadata": {
"ExecuteTime": {
"end_time": "2023-07-01T21:52:31.328546Z",
"start_time": "2023-07-01T21:51:02.284207Z"
},
"trusted": false
},
"id": "6bcc0689",
"cell_type": "code",
"source": "study_id = 'GCST90002392'\n\n# Reading stummary statistics from two studies:\nsumstats = (\n spark.read.parquet(f'gs://ot-team/dsuveges/{study_id}_significant', recursiveFileLookup=True)\n .select(\n 'studyId', \n 'variantId', \n 'chromosome', \n 'position', \n 'rsId', \n 'pValue', \n *parse_pvalue(f.col('pValue')),\n calculate_neglog_pvalue(*parse_pvalue(f.col('pValue'))).alias('negLogPValue')\n )\n)\n\n\ndef find_peak_w_window(window_size: int):\n\n @f.pandas_udf(t.ArrayType(t.BooleanType()))\n def find_peak(neglog: pd.Series, position: pd.Series) -> list():\n\n is_lead = [None] * len(neglog)\n for index in range(len(position)):\n if is_lead[index] is None:\n # A new lead has been found:\n is_lead[index] = True\n\n # Setting flags to False within the window:\n for new_index in range(index+1, len(position)):\n # This position is within the window:\n if abs(position[new_index] - position[index]) < window_size:\n is_lead[new_index] = False\n\n return is_lead\n \n return find_peak\n\nx = (\n sumstats\n .withColumn(\n 'isLead', \n find_peak_w_window(2.5e5)(\n f.col('negLogPValue'), \n f.col('position'), \n ).over(\n Window\n .partitionBy('studyId', 'chromosome')\n .orderBy(f.col('negLogPValue').desc())\n .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)\n )\n )\n .withColumn('rank', f.row_number().over(\n Window\n .partitionBy('studyId', 'chromosome')\n .orderBy(f.col('negLogPValue').desc())\n )\n )\n .withColumn(\n 'isLead', f.col('isLead').getItem(f.col('rank')-1) \n )\n .persist()\n)\n\n\n\n\nx.filter(f.col('isLead')).show(1, False, True)\nx.write.mode('overwrite').parquet('gs://ot-team/dsuveges/new_sumstat_window_v2')\n",
"execution_count": 9,
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": "/usr/lib/spark/python/pyspark/sql/column.py:419: FutureWarning: A column as 'key' in getItem is deprecated as of Spark 3.0, and will not be supported in the future release. Use `column[key]` or `column.key` syntax instead.\n warnings.warn(\n \r"
},
{
"name": "stdout",
"output_type": "stream",
"text": "-RECORD 0------------------------\n studyId | GCST90002392 \n variantId | 6_25842723_T_G \n chromosome | 6 \n position | 25842723 \n rsId | rs1408272 \n pValue | 4.6E-987 \n pValueMantissa | 4.6 \n pValueExponent | -987 \n negLogPValue | 986.337 \n isLead | true \n rank | 1 \nonly showing top 1 row\n\n"
},
{
"name": "stderr",
"output_type": "stream",
"text": " \r"
}
]
},
{
"metadata": {},
"id": "ecf3e278",
"cell_type": "markdown",
"source": "### Optimized windowing"
},
{
"metadata": {
"ExecuteTime": {
"start_time": "2023-07-03T13:10:59.681314Z",
"end_time": "2023-07-03T13:14:14.368788Z"
},
"scrolled": false,
"trusted": true
},
"id": "11969961",
"cell_type": "code",
"source": "sumstats = (\n # Reading all summary statistics:\n spark.read.parquet(\n *[\n f'gs://open-targets-gwas-summary-stats/studies/GCST90002392',\n f'gs://open-targets-gwas-summary-stats/studies/GCST90018969',\n f'gs://open-targets-gwas-summary-stats/studies/GCST90013976',\n f'gs://open-targets-gwas-summary-stats/studies/GCST90002346',\n ]\n , recursiveFileLookup=True)\n .filter(\n # Filter for snps with position available:\n f.col('position').isNotNull() & \n # Filter snps for GWAS significance:\n (f.col('pValue').cast(t.DoubleType()) <= pv_threshold)\n )\n # Parser p-values provided as text:\n .select('*', *parse_pvalue(f.col('pValue')))\n # Select relevant columns + calculate neg-log p-value:\n .select(\n 'studyId', \n 'variantId', \n 'chromosome', \n 'position', \n 'rsId', \n 'pValue', \n 'pValueMantissa',\n 'pValueExponent',\n calculate_neglog_pvalue(f.col('pValueMantissa'), f.col('pValueExponent')).alias('negLogPValue')\n )\n)\n\n\ndef find_peak_w_window(window_size: int) -> Callable:\n\n @f.pandas_udf(t.ArrayType(t.BooleanType()))\n def find_peak(position: pd.Series) -> list():\n # List to store boolean flags indicating leads:\n is_lead = []\n\n # List containing indices of leads:\n lead_indices = []\n\n # Looping through all positions:\n for index in range(len(position)):\n # Looping through leads to find out if they are within a window:\n for lead_index in lead_indices:\n # If any of the leads within the window:\n if abs(position[lead_index] - position[index]) < window_size:\n # Setting the lead flag to false:\n is_lead.append(False)\n # Skipping further checks:\n break\n else:\n # None of the leads were within the window:\n lead_indices.append(index)\n is_lead.append(True)\n\n return is_lead\n \n return find_peak\n\nx = (\n sumstats\n .withColumn(\n 'isLead', \n find_peak_w_window(2.5e5)(f.col('position')).over(\n Window\n .partitionBy('studyId', 'chromosome')\n .orderBy(f.col('negLogPValue').desc())\n .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)\n )\n )\n .withColumn('rank', f.row_number().over(\n Window\n .partitionBy('studyId', 'chromosome')\n .orderBy(f.col('negLogPValue').desc())\n )\n )\n .withColumn(\n 'isLead', f.col('isLead').getItem(f.col('rank')-1) \n )\n .persist()\n)\n\n\n\n\nx.filter(f.col('isLead')).show(1, False, True)\nx.write.mode('overwrite').parquet('gs://ot-team/dsuveges/new_sumstat_window_v2')\nprint(x.filter(f.col('isLead')).count())\nprint(x.count())",
"execution_count": 10,
"outputs": [
{
"output_type": "stream",
"text": " \r",
"name": "stderr"
},
{
"output_type": "stream",
"text": "-RECORD 0-------------------------\n studyId | GCST90002346 \n variantId | 1_205268009_T_C \n chromosome | 1 \n position | 205268009 \n rsId | rs1668871 \n pValue | 4.55E-723 \n pValueMantissa | 4.55 \n pValueExponent | -723 \n negLogPValue | 722.342 \n isLead | true \n rank | 1 \nonly showing top 1 row\n\n",
"name": "stdout"
},
{
"output_type": "stream",
"text": " \r",
"name": "stderr"
},
{
"output_type": "stream",
"text": "4154\n523316\n",
"name": "stdout"
}
]
},
{
"metadata": {
"trusted": false
},
"id": "dac85a6d",
"cell_type": "code",
"source": "study_id = 'GCST90002392'\n\n# Reading stummary statistics from two studies:\nsumstats = (\n spark.read.parquet(f'gs://ot-team/dsuveges/{study_id}_significant', recursiveFileLookup=True)\n .select(\n 'studyId', \n 'variantId', \n 'chromosome', \n 'position', \n 'rsId', \n 'pValue', \n *parse_pvalue(f.col('pValue')),\n calculate_neglog_pvalue(*parse_pvalue(f.col('pValue'))).alias('negLogPValue')\n )\n)\n\n\ndef find_peak_w_window(window_size: int):\n\n @f.pandas_udf(t.ArrayType(t.BooleanType()))\n def find_peak(neglog: pd.Series, position: pd.Series) -> list():\n\n is_lead = [None] * len(neglog)\n for index in range(len(position)):\n if is_lead[index] is None:\n # A new lead has been found:\n is_lead[index] = True\n\n # Setting flags to False within the window:\n for new_index in range(index+1, len(position)):\n # This position is within the window:\n if abs(position[new_index] - position[index]) < window_size:\n is_lead[new_index] = False\n\n return is_lead\n \n return find_peak\n\nx = (\n sumstats\n .withColumn(\n 'isLead', \n find_peak_w_window(2.5e5)(\n f.col('negLogPValue'), \n f.col('position'), \n ).over(\n Window\n .partitionBy('studyId', 'chromosome')\n .orderBy(f.col('negLogPValue').desc())\n .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)\n )\n )\n .withColumn('rank', f.row_number().over(\n Window\n .partitionBy('studyId', 'chromosome')\n .orderBy(f.col('negLogPValue').desc())\n )\n )\n .withColumn(\n 'isLead', f.col('isLead').getItem(f.col('rank')-1) \n )\n .persist()\n)\n\n\n\n\nx.filter(f.col('isLead')).show(1, False, True)\nx.write.mode('overwrite').parquet('gs://ot-team/dsuveges/new_sumstat_window_v2')\n",
"execution_count": null,
"outputs": []
},
{
"metadata": {
"ExecuteTime": {
"start_time": "2023-07-03T13:04:32.259318Z",
"end_time": "2023-07-03T13:04:32.660651Z"
},
"trusted": true
},
"cell_type": "code",
"source": "print(x.count())",
"execution_count": 8,
"outputs": [
{
"output_type": "stream",
"text": "2111639\n",
"name": "stdout"
}
]
},
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "",
"execution_count": null,
"outputs": []
}
],
"metadata": {
"_draft": {
"nbviewer_url": "https://gist.github.com/DSuveges/9dc48538314e1dbe54b78cdb635baa14"
},
"gist": {
"id": "9dc48538314e1dbe54b78cdb635baa14",
"data": {
"description": "GCS/New windowing.ipynb",
"public": true
}
},
"kernelspec": {
"name": "python3",
"display_name": "Python 3",
"language": "python"
},
"language_info": {
"name": "python",
"version": "3.10.8",
"mimetype": "text/x-python",
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"pygments_lexer": "ipython3",
"nbconvert_exporter": "python",
"file_extension": ".py"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment