Skip to content

Instantly share code, notes, and snippets.

@DSuveges
Last active July 17, 2023 15:42
Show Gist options
  • Save DSuveges/48611eead49ae1bd9553516d72efe8e0 to your computer and use it in GitHub Desktop.
Save DSuveges/48611eead49ae1bd9553516d72efe8e0 to your computer and use it in GitHub Desktop.
GCS/ClinVar_evidence_dating.ipynb
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"metadata": {},
"cell_type": "markdown",
"source": "# Dating evidece from ClinVar\n\n- ClinVar evidence is based on RCV identifiers. \n- RCVs on ClinVar represent one or more submissions identified by SCV identifiers.\n- CSV identifiers are dated. -> Extracting first submission date.\n\n## Steps.\n\n1. Download clinvar release from their ftp site. (`ftp://ftp.ncbi.nlm.nih.gov/pub/clinvar/xml/ClinVarFullRelease_2023-07.xml.gz`)\n2. Extract data (3.2GB -> 40GB)\n3. Parsing xml to get RCV -> CSV -> Date triplets\n4. Extract the oldest date for each RCV.\n5. Output data.\n\nBy having the RCV2Date dataset, it allows us to join with our evidence set. \n\n"
},
{
"metadata": {
"ExecuteTime": {
"start_time": "2023-07-17T11:04:56.880098Z",
"end_time": "2023-07-17T11:06:30.206439Z"
},
"trusted": true
},
"cell_type": "code",
"source": "%%bash\n\ncd /home/dsuveges\nwget ftp://ftp.ncbi.nlm.nih.gov/pub/clinvar/xml/ClinVarFullRelease_2023-07.xml.gz \ngunzip ClinVarFullRelease_2023-07.xml.gz \n\nls -lah",
"execution_count": 3,
"outputs": [
{
"output_type": "stream",
"text": "total 44G\ndrwxr-xr-x 10 dsuveges dsuveges 4.0K Jul 17 11:04 .\ndrwxr-xr-x 49 root root 4.0K Jun 6 08:20 ..\n-rw------- 1 dsuveges dsuveges 3.9K Jul 11 11:03 .bash_history\n-rw-r--r-- 1 dsuveges dsuveges 220 Mar 27 2022 .bash_logout\n-rw-r--r-- 1 dsuveges dsuveges 3.5K Mar 27 2022 .bashrc\ndrwxr-xr-x 4 dsuveges dsuveges 4.0K Jul 11 10:25 .cache\ndrwxr-xr-x 4 dsuveges dsuveges 4.0K Jun 30 23:15 .config\ndrwxr-xr-x 3 dsuveges dsuveges 4.0K Jun 1 11:30 .dotnet\ndrwxr-xr-x 3 dsuveges dsuveges 4.0K Jun 14 08:44 .gsutil\ndrwxr-xr-x 5 dsuveges dsuveges 4.0K Jun 1 11:32 .ipython\n-rw------- 1 dsuveges dsuveges 31 Jun 14 08:51 .lesshst\ndrwxr-xr-x 5 dsuveges dsuveges 4.0K Jul 11 10:23 .local\n-rw-r--r-- 1 dsuveges dsuveges 807 Mar 27 2022 .profile\ndrwx------ 2 dsuveges dsuveges 4.0K Jul 17 08:37 .ssh\n-rw------- 1 dsuveges dsuveges 907 Jun 29 13:05 .viminfo\ndrwxr-xr-x 5 dsuveges dsuveges 4.0K Jun 2 03:34 .vscode-server\n-rw-r--r-- 1 dsuveges dsuveges 183 Jun 1 11:30 .wget-hsts\n-rw-r--r-- 1 root root 41G Jul 17 08:50 ClinVarFullRelease_2023-07.xml\n-rw-r--r-- 1 root root 3.2G Jul 17 11:06 ClinVarFullRelease_2023-07.xml.gz\n-rw-r--r-- 1 dsuveges dsuveges 262K Jun 14 14:08 OTGenetics.1k.tsv.bz\n-rw-r--r-- 1 dsuveges dsuveges 90K Jun 14 14:09 OTGenetics.1k.tsv.bz.tbi\n-rw-r--r-- 1 dsuveges dsuveges 9.6M Jun 14 08:56 OTGenetics.tsv.bz\n-rw-r--r-- 1 dsuveges dsuveges 759K Jun 14 08:56 OTGenetics.tsv.bz.tbi\n-rw-r--r-- 1 dsuveges dsuveges 113M Jun 14 08:52 part-00000-f971a369-f332-4b5d-a6ea-02c31885807a-c000.csv\n",
"name": "stdout"
},
{
"output_type": "stream",
"text": "IOPub data rate exceeded.\nThe notebook server will temporarily stop sending output\nto the client in order to avoid crashing it.\nTo change this limit, set the config variable\n`--NotebookApp.iopub_data_rate_limit`.\n\nCurrent values:\nNotebookApp.iopub_data_rate_limit=1000000.0 (bytes/sec)\nNotebookApp.rate_limit_window=3.0 (secs)\n\n",
"name": "stderr"
}
]
},
{
"metadata": {},
"cell_type": "markdown",
"source": "### Parsing xml\n\nAs the xml file is massive, it is processed as data stream. The parsed values are stored in a list, which is then converted into a spark dataframe.\n\n#### Option 1:\n\nAs the dataset is very big (~3M) entries, the process was failing, so I was thinking it could make sense to just write out json lines as we go. \n\nThis was not the solution, however after reaching the end of a clinvar set, we parse it, the object needed to be destroyed to free up memory. Otherwise the kernel was terminated by the OS."
},
{
"metadata": {
"ExecuteTime": {
"start_time": "2023-07-17T14:58:04.458216Z",
"end_time": "2023-07-17T14:58:04.598531Z"
},
"trusted": true
},
"cell_type": "code",
"source": "import xml.etree.ElementTree as etree\nimport json\nimport gzip\n\n\n# ftp://ftp.ncbi.nlm.nih.gov/pub/clinvar/xml/clinvar_variation/ClinVarVariationRelease_2023-07.xml.gz\ndatafile = '/home/dsuveges/ClinVarFullRelease_2023-07.xml.gz'\ncounter = 0\njson_output = open('output.json', 'wt')\n\nwith gzip.open(datafile, 'rt') as xml_stream:\n for event, elem in etree.iterparse(xml_stream, events=(\"start\", \"end\")):\n # Extract clinvar set id:\n if (elem.tag == \"ClinVarSet\") & (event == 'end'):\n # Extract data:\n parsed_data = {\n \"clinVarSetId\": elem.get('ID'), # Single ClinVarSet identifier\n 'rcvAccession': elem.find('ReferenceClinVarAssertion').find('ClinVarAccession').get('Acc'), # Single RCV accession identifiers\n 'submissions': [\n {\n 'csvAccession': csvAccession,\n 'submissionDate': submissionDate\n } for (csvAccession, submissionDate) in zip(\n [scv.find('ClinVarAccession').get('Acc') for scv in elem.findall('ClinVarAssertion')], # SCV accession identifiers\n [scv.find('ClinVarAccession').get('DateCreated') for scv in elem.findall('ClinVarAssertion')] # SCV DateCreated\n )\n ]\n }\n counter += 1\n\n # Writing data into a json file:\n json.dump(parsed_data, json_output)\n json_output.write('\\n')\n \n # Empty dataset:\n elem.clear()\n \n # For testing purposes, we can break out if we want:\n if counter == 200:\n break\n\njson_output.close()",
"execution_count": 3,
"outputs": []
},
{
"metadata": {},
"cell_type": "markdown",
"source": "#### Option 2:\n\nStore parsed data in memory and convert to spark dataframe directly."
},
{
"metadata": {
"ExecuteTime": {
"start_time": "2023-07-17T15:01:30.318985Z",
"end_time": "2023-07-17T15:40:12.191754Z"
},
"trusted": true
},
"cell_type": "code",
"source": "import xml.etree.ElementTree as etree\nimport gzip\n\n# ftp://ftp.ncbi.nlm.nih.gov/pub/clinvar/xml/clinvar_variation/ClinVarVariationRelease_2023-07.xml.gz\ndatafile = '/home/dsuveges/ClinVarFullRelease_2023-07.xml.gz'\ncounter = 0\nparsed_data: list = []\n\nwith gzip.open(datafile, 'rt') as xml_stream:\n for event, elem in etree.iterparse(xml_stream, events=(\"start\", \"end\")):\n # Extract clinvar set id:\n if (elem.tag == \"ClinVarSet\") & (event == 'end'):\n # Adding parsed data to list:\n parsed_data.append(\n [\n elem.get('ID'), # Single ClinVarSet identifier\n elem.find('ReferenceClinVarAssertion').find('ClinVarAccession').get('Acc'), # Single RCV accession identifiers\n list(\n zip(\n [scv.find('ClinVarAccession').get('Acc') for scv in elem.findall('ClinVarAssertion')], # SCV accession identifiers\n [scv.find('ClinVarAccession').get('DateCreated') for scv in elem.findall('ClinVarAssertion')] # SCV DateCreated\n )\n )\n ]\n )\n counter += 1\n \n # Empty dataset:\n elem.clear()\n \n# # For testing purposes, we can break out if we want:\n# if counter == 200:\n# break\n\n\n# What do we have:\nprint(f'Number of entries processed: {len(parsed_data)}')\nprint(parsed_data[0])\n",
"execution_count": 6,
"outputs": [
{
"output_type": "stream",
"text": "Number of entries processed: 2982171\n['92147967', 'RCV000000006', [('SCV000020149', '2013-04-04')]]\n",
"name": "stdout"
}
]
},
{
"metadata": {},
"cell_type": "markdown",
"source": "### Convert parsed data into Spark Dataframe"
},
{
"metadata": {
"ExecuteTime": {
"start_time": "2023-07-17T15:00:40.577803Z",
"end_time": "2023-07-17T15:01:11.119517Z"
},
"trusted": true
},
"cell_type": "code",
"source": "from pyspark.sql import SparkSession, functions as f, types as t\n\nspark = SparkSession.builder.getOrCreate()\n\n# Schema for the table we generate from the parsed data:\nschema = t.StructType(\n [\n t.StructField('clinVarSetId', t.StringType(), True),\n t.StructField('rcvAccession', t.StringType(), True),\n t.StructField(\n 'submissions',\n t.ArrayType(\n t.StructType(\n [\n t.StructField('csvAccession', t.StringType(), True),\n t.StructField('submissionDate', t.StringType(), True),\n ]\n )\n ),\n True\n )\n ]\n)\n\n# Convert:\ndf = spark.createDataFrame(parsed_data, schema).persist()\n\ndf.show()\ndf.printSchema()\nprint(f'Number of rows: {df.count()}')\nprint(f\"Number of unique rcv: {df.select('rcvAccession').distinct().count()}\")",
"execution_count": 5,
"outputs": [
{
"output_type": "stream",
"text": "Setting default log level to \"WARN\".\nTo adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n23/07/17 15:00:47 INFO SparkEnv: Registering MapOutputTracker\n23/07/17 15:00:47 INFO SparkEnv: Registering BlockManagerMaster\n23/07/17 15:00:47 INFO SparkEnv: Registering BlockManagerMasterHeartbeat\n23/07/17 15:00:47 INFO SparkEnv: Registering OutputCommitCoordinator\n \r",
"name": "stderr"
},
{
"output_type": "stream",
"text": "+------------+------------+--------------------+\n|clinVarSetId|rcvAccession| submissions|\n+------------+------------+--------------------+\n| 92147967|RCV000000006|[{SCV000020149, 2...|\n| 92147968|RCV000000007|[{SCV000020150, 2...|\n| 92147969|RCV000000008|[{SCV000020151, 2...|\n| 92147970|RCV000000009|[{SCV000020152, 2...|\n| 92147971|RCV000000010|[{SCV000020153, 2...|\n| 92147973|RCV000000012|[{SCV000020155, 2...|\n| 92147974|RCV000000013|[{SCV000020156, 2...|\n| 92147975|RCV000000014|[{SCV000020157, 2...|\n| 92147977|RCV000000016|[{SCV000020159, 2...|\n| 92148003|RCV000000042|[{SCV000020185, 2...|\n| 92148004|RCV000000043|[{SCV000020186, 2...|\n| 92148010|RCV000000049|[{SCV000020192, 2...|\n| 92148018|RCV000000057|[{SCV000020200, 2...|\n| 92148020|RCV000000059|[{SCV000020202, 2...|\n| 92148021|RCV000000060|[{SCV000020203, 2...|\n| 92148022|RCV000000061|[{SCV000020204, 2...|\n| 92148024|RCV000000063|[{SCV000020206, 2...|\n| 92148025|RCV000000064|[{SCV000020207, 2...|\n| 92148029|RCV000000068|[{SCV000020211, 2...|\n| 92148030|RCV000000069|[{SCV000020212, 2...|\n+------------+------------+--------------------+\nonly showing top 20 rows\n\nroot\n |-- clinVarSetId: string (nullable = true)\n |-- rcvAccession: string (nullable = true)\n |-- submissions: array (nullable = true)\n | |-- element: struct (containsNull = true)\n | | |-- csvAccession: string (nullable = true)\n | | |-- submissionDate: string (nullable = true)\n\n",
"name": "stdout"
},
{
"output_type": "stream",
"text": " \r",
"name": "stderr"
},
{
"output_type": "stream",
"text": "Number of rows: 200\nNumber of unique rcv: 200\n",
"name": "stdout"
}
]
},
{
"metadata": {
"ExecuteTime": {
"start_time": "2023-07-17T15:40:35.129100Z",
"end_time": "2023-07-17T15:41:28.531302Z"
},
"trusted": true
},
"cell_type": "code",
"source": "(\n spark.createDataFrame(parsed_data, schema)\n # Sorting submissions:\n .withColumn(\n \"submissions\",\n f.expr(\n '''\n array_sort(\n transform(\n submissions, x -> struct(x['submissionDate'] as submissionDate, x['csvAccession'] as csvAccession)\n )\n )'''\n ),\n )\n # Extract fist date:\n .withColumn('firstSubmission', f.col('submissions').getItem(0).getField('csvAccession'))\n .withColumn('firstSubmissionDate', f.col('submissions').getItem(0).getField('submissionDate'))\n .write.mode('overwrite').parquet('gs://ot-team/dsuveges/ClinVar_dated_release.2023.07')\n)",
"execution_count": 8,
"outputs": [
{
"output_type": "stream",
"text": "23/07/17 15:41:16 WARN TaskSetManager: Stage 10 contains a task of very large size (11191 KiB). The maximum recommended task size is 1000 KiB.\n \r",
"name": "stderr"
}
]
},
{
"metadata": {
"ExecuteTime": {
"start_time": "2023-07-17T15:40:12.200939Z",
"end_time": "2023-07-17T15:40:12.207930Z"
},
"trusted": true
},
"cell_type": "code",
"source": "2 +9",
"execution_count": 7,
"outputs": [
{
"output_type": "execute_result",
"execution_count": 7,
"data": {
"text/plain": "11"
},
"metadata": {}
}
]
},
{
"metadata": {
"trusted": true
},
"cell_type": "code",
"source": "",
"execution_count": null,
"outputs": []
}
],
"metadata": {
"kernelspec": {
"name": "python3",
"display_name": "Python 3",
"language": "python"
},
"language_info": {
"name": "python",
"version": "3.10.8",
"mimetype": "text/x-python",
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"pygments_lexer": "ipython3",
"nbconvert_exporter": "python",
"file_extension": ".py"
},
"gist": {
"id": "48611eead49ae1bd9553516d72efe8e0",
"data": {
"description": "GCS/ClinVar_evidence_dating.ipynb",
"public": true
}
},
"_draft": {
"nbviewer_url": "https://gist.github.com/DSuveges/48611eead49ae1bd9553516d72efe8e0"
}
},
"nbformat": 4,
"nbformat_minor": 5
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment