Last active
July 17, 2023 15:42
-
-
Save DSuveges/48611eead49ae1bd9553516d72efe8e0 to your computer and use it in GitHub Desktop.
GCS/ClinVar_evidence_dating.ipynb
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"metadata": {}, | |
"cell_type": "markdown", | |
"source": "# Dating evidece from ClinVar\n\n- ClinVar evidence is based on RCV identifiers. \n- RCVs on ClinVar represent one or more submissions identified by SCV identifiers.\n- CSV identifiers are dated. -> Extracting first submission date.\n\n## Steps.\n\n1. Download clinvar release from their ftp site. (`ftp://ftp.ncbi.nlm.nih.gov/pub/clinvar/xml/ClinVarFullRelease_2023-07.xml.gz`)\n2. Extract data (3.2GB -> 40GB)\n3. Parsing xml to get RCV -> CSV -> Date triplets\n4. Extract the oldest date for each RCV.\n5. Output data.\n\nBy having the RCV2Date dataset, it allows us to join with our evidence set. \n\n" | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"start_time": "2023-07-17T11:04:56.880098Z", | |
"end_time": "2023-07-17T11:06:30.206439Z" | |
}, | |
"trusted": true | |
}, | |
"cell_type": "code", | |
"source": "%%bash\n\ncd /home/dsuveges\nwget ftp://ftp.ncbi.nlm.nih.gov/pub/clinvar/xml/ClinVarFullRelease_2023-07.xml.gz \ngunzip ClinVarFullRelease_2023-07.xml.gz \n\nls -lah", | |
"execution_count": 3, | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"text": "total 44G\ndrwxr-xr-x 10 dsuveges dsuveges 4.0K Jul 17 11:04 .\ndrwxr-xr-x 49 root root 4.0K Jun 6 08:20 ..\n-rw------- 1 dsuveges dsuveges 3.9K Jul 11 11:03 .bash_history\n-rw-r--r-- 1 dsuveges dsuveges 220 Mar 27 2022 .bash_logout\n-rw-r--r-- 1 dsuveges dsuveges 3.5K Mar 27 2022 .bashrc\ndrwxr-xr-x 4 dsuveges dsuveges 4.0K Jul 11 10:25 .cache\ndrwxr-xr-x 4 dsuveges dsuveges 4.0K Jun 30 23:15 .config\ndrwxr-xr-x 3 dsuveges dsuveges 4.0K Jun 1 11:30 .dotnet\ndrwxr-xr-x 3 dsuveges dsuveges 4.0K Jun 14 08:44 .gsutil\ndrwxr-xr-x 5 dsuveges dsuveges 4.0K Jun 1 11:32 .ipython\n-rw------- 1 dsuveges dsuveges 31 Jun 14 08:51 .lesshst\ndrwxr-xr-x 5 dsuveges dsuveges 4.0K Jul 11 10:23 .local\n-rw-r--r-- 1 dsuveges dsuveges 807 Mar 27 2022 .profile\ndrwx------ 2 dsuveges dsuveges 4.0K Jul 17 08:37 .ssh\n-rw------- 1 dsuveges dsuveges 907 Jun 29 13:05 .viminfo\ndrwxr-xr-x 5 dsuveges dsuveges 4.0K Jun 2 03:34 .vscode-server\n-rw-r--r-- 1 dsuveges dsuveges 183 Jun 1 11:30 .wget-hsts\n-rw-r--r-- 1 root root 41G Jul 17 08:50 ClinVarFullRelease_2023-07.xml\n-rw-r--r-- 1 root root 3.2G Jul 17 11:06 ClinVarFullRelease_2023-07.xml.gz\n-rw-r--r-- 1 dsuveges dsuveges 262K Jun 14 14:08 OTGenetics.1k.tsv.bz\n-rw-r--r-- 1 dsuveges dsuveges 90K Jun 14 14:09 OTGenetics.1k.tsv.bz.tbi\n-rw-r--r-- 1 dsuveges dsuveges 9.6M Jun 14 08:56 OTGenetics.tsv.bz\n-rw-r--r-- 1 dsuveges dsuveges 759K Jun 14 08:56 OTGenetics.tsv.bz.tbi\n-rw-r--r-- 1 dsuveges dsuveges 113M Jun 14 08:52 part-00000-f971a369-f332-4b5d-a6ea-02c31885807a-c000.csv\n", | |
"name": "stdout" | |
}, | |
{ | |
"output_type": "stream", | |
"text": "IOPub data rate exceeded.\nThe notebook server will temporarily stop sending output\nto the client in order to avoid crashing it.\nTo change this limit, set the config variable\n`--NotebookApp.iopub_data_rate_limit`.\n\nCurrent values:\nNotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)\nNotebookApp.rate_limit_window=3.0 (secs)\n\n", | |
"name": "stderr" | |
} | |
] | |
}, | |
{ | |
"metadata": {}, | |
"cell_type": "markdown", | |
"source": "### Parsing xml\n\nAs the xml file is massive, it is processed as data stream. The parsed values are stored in a list, which is then converted into a spark dataframe.\n\n#### Option 1:\n\nAs the dataset is very big (~3M) entries, the process was failing, so I was thinking it could make sense to just write out json lines as we go. \n\nThis was not the solution, however after reaching the end of a clinvar set, we parse it, the object needed to be destroyed to free up memory. Otherwise the kernel was terminated by the OS." | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"start_time": "2023-07-17T14:58:04.458216Z", | |
"end_time": "2023-07-17T14:58:04.598531Z" | |
}, | |
"trusted": true | |
}, | |
"cell_type": "code", | |
"source": "import xml.etree.ElementTree as etree\nimport json\nimport gzip\n\n\n# ftp://ftp.ncbi.nlm.nih.gov/pub/clinvar/xml/clinvar_variation/ClinVarVariationRelease_2023-07.xml.gz\ndatafile = '/home/dsuveges/ClinVarFullRelease_2023-07.xml.gz'\ncounter = 0\njson_output = open('output.json', 'wt')\n\nwith gzip.open(datafile, 'rt') as xml_stream:\n for event, elem in etree.iterparse(xml_stream, events=(\"start\", \"end\")):\n # Extract clinvar set id:\n if (elem.tag == \"ClinVarSet\") & (event == 'end'):\n # Extract data:\n parsed_data = {\n \"clinVarSetId\": elem.get('ID'), # Single ClinVarSet identifier\n 'rcvAccession': elem.find('ReferenceClinVarAssertion').find('ClinVarAccession').get('Acc'), # Single RCV accession identifiers\n 'submissions': [\n {\n 'csvAccession': csvAccession,\n 'submissionDate': submissionDate\n } for (csvAccession, submissionDate) in zip(\n [scv.find('ClinVarAccession').get('Acc') for scv in elem.findall('ClinVarAssertion')], # SCV accession identifiers\n [scv.find('ClinVarAccession').get('DateCreated') for scv in elem.findall('ClinVarAssertion')] # SCV DateCreated\n )\n ]\n }\n counter += 1\n\n # Writing data into a json file:\n json.dump(parsed_data, json_output)\n json_output.write('\\n')\n \n # Empty dataset:\n elem.clear()\n \n # For testing purposes, we can break out if we want:\n if counter == 200:\n break\n\njson_output.close()", | |
"execution_count": 3, | |
"outputs": [] | |
}, | |
{ | |
"metadata": {}, | |
"cell_type": "markdown", | |
"source": "#### Option 2:\n\nStore parsed data in memory and convert to spark dataframe directly." | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"start_time": "2023-07-17T15:01:30.318985Z", | |
"end_time": "2023-07-17T15:40:12.191754Z" | |
}, | |
"trusted": true | |
}, | |
"cell_type": "code", | |
"source": "import xml.etree.ElementTree as etree\nimport gzip\n\n# ftp://ftp.ncbi.nlm.nih.gov/pub/clinvar/xml/clinvar_variation/ClinVarVariationRelease_2023-07.xml.gz\ndatafile = '/home/dsuveges/ClinVarFullRelease_2023-07.xml.gz'\ncounter = 0\nparsed_data: list = []\n\nwith gzip.open(datafile, 'rt') as xml_stream:\n for event, elem in etree.iterparse(xml_stream, events=(\"start\", \"end\")):\n # Extract clinvar set id:\n if (elem.tag == \"ClinVarSet\") & (event == 'end'):\n # Adding parsed data to list:\n parsed_data.append(\n [\n elem.get('ID'), # Single ClinVarSet identifier\n elem.find('ReferenceClinVarAssertion').find('ClinVarAccession').get('Acc'), # Single RCV accession identifiers\n list(\n zip(\n [scv.find('ClinVarAccession').get('Acc') for scv in elem.findall('ClinVarAssertion')], # SCV accession identifiers\n [scv.find('ClinVarAccession').get('DateCreated') for scv in elem.findall('ClinVarAssertion')] # SCV DateCreated\n )\n )\n ]\n )\n counter += 1\n \n # Empty dataset:\n elem.clear()\n \n# # For testing purposes, we can break out if we want:\n# if counter == 200:\n# break\n\n\n# What do we have:\nprint(f'Number of entries processed: {len(parsed_data)}')\nprint(parsed_data[0])\n", | |
"execution_count": 6, | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"text": "Number of entries processed: 2982171\n['92147967', 'RCV000000006', [('SCV000020149', '2013-04-04')]]\n", | |
"name": "stdout" | |
} | |
] | |
}, | |
{ | |
"metadata": {}, | |
"cell_type": "markdown", | |
"source": "### Convert parsed data into Spark Dataframe" | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"start_time": "2023-07-17T15:00:40.577803Z", | |
"end_time": "2023-07-17T15:01:11.119517Z" | |
}, | |
"trusted": true | |
}, | |
"cell_type": "code", | |
"source": "from pyspark.sql import SparkSession, functions as f, types as t\n\nspark = SparkSession.builder.getOrCreate()\n\n# Schema for the table we generate from the parsed data:\nschema = t.StructType(\n [\n t.StructField('clinVarSetId', t.StringType(), True),\n t.StructField('rcvAccession', t.StringType(), True),\n t.StructField(\n 'submissions',\n t.ArrayType(\n t.StructType(\n [\n t.StructField('csvAccession', t.StringType(), True),\n t.StructField('submissionDate', t.StringType(), True),\n ]\n )\n ),\n True\n )\n ]\n)\n\n# Convert:\ndf = spark.createDataFrame(parsed_data, schema).persist()\n\ndf.show()\ndf.printSchema()\nprint(f'Number of rows: {df.count()}')\nprint(f\"Number of unique rcv: {df.select('rcvAccession').distinct().count()}\")", | |
"execution_count": 5, | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"text": "Setting default log level to \"WARN\".\nTo adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n23/07/17 15:00:47 INFO SparkEnv: Registering MapOutputTracker\n23/07/17 15:00:47 INFO SparkEnv: Registering BlockManagerMaster\n23/07/17 15:00:47 INFO SparkEnv: Registering BlockManagerMasterHeartbeat\n23/07/17 15:00:47 INFO SparkEnv: Registering OutputCommitCoordinator\n \r", | |
"name": "stderr" | |
}, | |
{ | |
"output_type": "stream", | |
"text": "+------------+------------+--------------------+\n|clinVarSetId|rcvAccession| submissions|\n+------------+------------+--------------------+\n| 92147967|RCV000000006|[{SCV000020149, 2...|\n| 92147968|RCV000000007|[{SCV000020150, 2...|\n| 92147969|RCV000000008|[{SCV000020151, 2...|\n| 92147970|RCV000000009|[{SCV000020152, 2...|\n| 92147971|RCV000000010|[{SCV000020153, 2...|\n| 92147973|RCV000000012|[{SCV000020155, 2...|\n| 92147974|RCV000000013|[{SCV000020156, 2...|\n| 92147975|RCV000000014|[{SCV000020157, 2...|\n| 92147977|RCV000000016|[{SCV000020159, 2...|\n| 92148003|RCV000000042|[{SCV000020185, 2...|\n| 92148004|RCV000000043|[{SCV000020186, 2...|\n| 92148010|RCV000000049|[{SCV000020192, 2...|\n| 92148018|RCV000000057|[{SCV000020200, 2...|\n| 92148020|RCV000000059|[{SCV000020202, 2...|\n| 92148021|RCV000000060|[{SCV000020203, 2...|\n| 92148022|RCV000000061|[{SCV000020204, 2...|\n| 92148024|RCV000000063|[{SCV000020206, 2...|\n| 92148025|RCV000000064|[{SCV000020207, 2...|\n| 92148029|RCV000000068|[{SCV000020211, 2...|\n| 92148030|RCV000000069|[{SCV000020212, 2...|\n+------------+------------+--------------------+\nonly showing top 20 rows\n\nroot\n |-- clinVarSetId: string (nullable = true)\n |-- rcvAccession: string (nullable = true)\n |-- submissions: array (nullable = true)\n | |-- element: struct (containsNull = true)\n | | |-- csvAccession: string (nullable = true)\n | | |-- submissionDate: string (nullable = true)\n\n", | |
"name": "stdout" | |
}, | |
{ | |
"output_type": "stream", | |
"text": " \r", | |
"name": "stderr" | |
}, | |
{ | |
"output_type": "stream", | |
"text": "Number of rows: 200\nNumber of unique rcv: 200\n", | |
"name": "stdout" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"start_time": "2023-07-17T15:40:35.129100Z", | |
"end_time": "2023-07-17T15:41:28.531302Z" | |
}, | |
"trusted": true | |
}, | |
"cell_type": "code", | |
"source": "(\n spark.createDataFrame(parsed_data, schema)\n # Sorting submissions:\n .withColumn(\n \"submissions\",\n f.expr(\n '''\n array_sort(\n transform(\n submissions, x -> struct(x['submissionDate'] as submissionDate, x['csvAccession'] as csvAccession)\n )\n )'''\n ),\n )\n # Extract fist date:\n .withColumn('firstSubmission', f.col('submissions').getItem(0).getField('csvAccession'))\n .withColumn('firstSubmissionDate', f.col('submissions').getItem(0).getField('submissionDate'))\n .write.mode('overwrite').parquet('gs://ot-team/dsuveges/ClinVar_dated_release.2023.07')\n)", | |
"execution_count": 8, | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"text": "23/07/17 15:41:16 WARN TaskSetManager: Stage 10 contains a task of very large size (11191 KiB). The maximum recommended task size is 1000 KiB.\n \r", | |
"name": "stderr" | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"ExecuteTime": { | |
"start_time": "2023-07-17T15:40:12.200939Z", | |
"end_time": "2023-07-17T15:40:12.207930Z" | |
}, | |
"trusted": true | |
}, | |
"cell_type": "code", | |
"source": "2 +9", | |
"execution_count": 7, | |
"outputs": [ | |
{ | |
"output_type": "execute_result", | |
"execution_count": 7, | |
"data": { | |
"text/plain": "11" | |
}, | |
"metadata": {} | |
} | |
] | |
}, | |
{ | |
"metadata": { | |
"trusted": true | |
}, | |
"cell_type": "code", | |
"source": "", | |
"execution_count": null, | |
"outputs": [] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"name": "python3", | |
"display_name": "Python 3", | |
"language": "python" | |
}, | |
"language_info": { | |
"name": "python", | |
"version": "3.10.8", | |
"mimetype": "text/x-python", | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"pygments_lexer": "ipython3", | |
"nbconvert_exporter": "python", | |
"file_extension": ".py" | |
}, | |
"gist": { | |
"id": "48611eead49ae1bd9553516d72efe8e0", | |
"data": { | |
"description": "GCS/ClinVar_evidence_dating.ipynb", | |
"public": true | |
} | |
}, | |
"_draft": { | |
"nbviewer_url": "https://gist.github.com/DSuveges/48611eead49ae1bd9553516d72efe8e0" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 5 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment