Last active
November 15, 2020 08:02
-
-
Save johngrimes/425119410742e71b020e4f2ccd657f5d to your computer and use it in GitHub Desktop.
Testing windowing as a substitute for straight aggregation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Windowing as a substitute for straight aggregation\n", | |
"\n", | |
"This notebook will simulate the execution of the following query to the [aggregate operation](https://pathling.csiro.au/docs/aggregate.html) in [Pathling](https://pathling.csiro.au):\n", | |
"\n", | |
"```\n", | |
"subjectResource: Patient\n", | |
"aggregation: count()\n", | |
"grouping: name.prefix contains 'Mrs.'\n", | |
"grouping: name.given contains 'Karina848'\n", | |
"```" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from pyspark.sql import SparkSession\n", | |
"from pyspark.sql.functions import *\n", | |
"from pyspark.sql import Window\n", | |
"import time\n", | |
"\n", | |
"spark = SparkSession.builder \\\n", | |
" .master('local[*]') \\\n", | |
" .config(\"spark.driver.memory\", \"2g\") \\\n", | |
" .config(\"spark.executor.memory\", \"6g\") \\\n", | |
" .config(\"spark.sql.autoBroadcastJoinThreshold\", \"-1\") \\\n", | |
" .config(\"spark.sql.adaptive.enabled\", \"true\") \\\n", | |
" .config(\"spark.sql.shuffle.service.enabled\", \"true\") \\\n", | |
" .config(\"spark.sql.shuffle.partitions\", \"2\") \\\n", | |
" .config(\"spark.scheduler.mode\", \"FAIR\") \\\n", | |
" .config(\"spark.dynamicAllocation.enabled\", \"true\") \\\n", | |
" .getOrCreate()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"start_time = time.time()" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Grouping 1" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"First we get the `Patient` dataset from file." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"patient = spark.read.parquet(\"file:///usr/share/data/synthea/1k_20190829/parquet/Patient.parquet\")\n", | |
"patient_id = patient[\"id\"]\n", | |
"patient_columns = [ patient[col] for col in patient.columns ]" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Traverse to the `name` element within `Patient`." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"grouping1 = patient.withColumn(\"a\", patient[\"id\"])\n", | |
"grouping1 = grouping1.withColumn(\"b\", explode_outer(patient[\"name\"]))\n", | |
"grouping1_id = grouping1[\"a\"]\n", | |
"grouping1_value = grouping1[\"b\"]" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Traverse to the `prefix` element within `name`." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"grouping1 = grouping1.withColumn(\"c\", explode_outer(grouping1_value.getField(\"prefix\")))\n", | |
"grouping1_value = grouping1[\"c\"]" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Create a dataset to represent the literal value `Mrs.`." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"mrs_literal = patient.select(patient_id.alias(\"d\"), lit(\"Mrs.\").alias(\"e\"))\n", | |
"mrs_literal_id = mrs_literal[\"d\"]\n", | |
"mrs_literal_value = mrs_literal[\"e\"]" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Create a column which calculates the result of a `contains` operation, with `prefix` on the left and `Mrs.` on the right." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from pyspark.sql.functions import max as spark_max\n", | |
"\n", | |
"grouping1 = grouping1.join(mrs_literal, grouping1_id == mrs_literal_id, \"left_outer\")\n", | |
"equality_with_null_checks = when(grouping1_value.isNull(), lit(None)) \\\n", | |
" .when(mrs_literal_value.isNull(), lit(None)) \\\n", | |
" .otherwise(grouping1_value == mrs_literal_value)\n", | |
"agg_column = spark_max(equality_with_null_checks)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Use __aggregation__ to get the result of the `contains` operation." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 8, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"grouping1 = grouping1.groupBy(patient_columns).agg(agg_column)\n", | |
"grouping1_id = grouping1[grouping1.columns[0]]\n", | |
"grouping1_value = grouping1[grouping1.columns[-1]]\n", | |
"patient_id = grouping1_id\n", | |
"patient_columns = [ grouping1[col] for col in grouping1.columns[0:(len(patient_columns) - 1)] ]" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Grouping 2" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Traverse to the `name` element within `Patient`." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 9, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"grouping2 = grouping1.withColumn(\"g\", grouping1_id)\n", | |
"grouping2 = grouping2.withColumn(\"h\", explode_outer(patient[\"name\"]))\n", | |
"grouping2_id = grouping2[\"g\"]\n", | |
"grouping2_value = grouping2[\"h\"]" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Traverse to the `given` element within `name`." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 10, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"grouping2 = grouping2.withColumn(\"i\", explode_outer(grouping2_value.getField(\"given\")))\n", | |
"grouping2_value = grouping2[\"i\"]" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Create a dataset to represent the literal value `Karina848`." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 11, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"karina_literal = patient.select(patient_id.alias(\"j\"), lit(\"Karina848\").alias(\"k\"))\n", | |
"karina_literal_id = karina_literal[\"j\"]\n", | |
"karina_literal_value = karina_literal[\"k\"]" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Create a column which calculates the result of a `contains` operation, with `given` on the left and `Karina848` on the right." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 12, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"from pyspark.sql.functions import max as spark_max\n", | |
"\n", | |
"grouping2 = grouping2.join(karina_literal, grouping2_id == karina_literal_id, \"left_outer\")\n", | |
"equality_with_null_checks = when(grouping2_value.isNull(), lit(None)) \\\n", | |
" .when(karina_literal_value.isNull(), lit(None)) \\\n", | |
" .otherwise(grouping2_value == karina_literal_value)\n", | |
"agg_column = spark_max(equality_with_null_checks)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Use __aggregation__ to get the result of the `contains` operation." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 13, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"group_by = patient_columns.append(grouping1_value)\n", | |
"grouping2 = grouping2.groupBy(patient_columns).agg(agg_column)\n", | |
"grouping2_id = grouping2[grouping2.columns[0]]\n", | |
"grouping2_value = grouping2[grouping2.columns[-1]]\n", | |
"patient_id = grouping2_id\n", | |
"patient_columns = [ grouping2[col] for col in grouping2.columns[0:(len(patient_columns) - 1)] ]\n", | |
"grouping1_value = grouping2[grouping2.columns[len(patient_columns)]]" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Aggregation" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Create a column which counts the number of patients within each grouping." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 14, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"grouping_columns = [grouping1_value, grouping2_value]\n", | |
"count_column = count(patient_id)\n", | |
"null_aware_count = when(count_column.isNull(), 0) \\\n", | |
" .otherwise(count_column)" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Create a dataset to represent the aggregation." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 15, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"aggregation = grouping2.groupBy(grouping_columns).agg(null_aware_count)\n", | |
"aggregation_value = aggregation[aggregation.columns[-1]]" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Final result" | |
] | |
}, | |
{ | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"Take the distinct values from the selection of each of the grouping columns, and the aggregation column." | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 16, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/html": [ | |
"<div>\n", | |
"<style scoped>\n", | |
" .dataframe tbody tr th:only-of-type {\n", | |
" vertical-align: middle;\n", | |
" }\n", | |
"\n", | |
" .dataframe tbody tr th {\n", | |
" vertical-align: top;\n", | |
" }\n", | |
"\n", | |
" .dataframe thead th {\n", | |
" text-align: right;\n", | |
" }\n", | |
"</style>\n", | |
"<table border=\"1\" class=\"dataframe\">\n", | |
" <thead>\n", | |
" <tr style=\"text-align: right;\">\n", | |
" <th></th>\n", | |
" <th>grouping1</th>\n", | |
" <th>grouping2</th>\n", | |
" <th>count</th>\n", | |
" </tr>\n", | |
" </thead>\n", | |
" <tbody>\n", | |
" <tr>\n", | |
" <th>0</th>\n", | |
" <td>False</td>\n", | |
" <td>False</td>\n", | |
" <td>897</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>1</th>\n", | |
" <td>True</td>\n", | |
" <td>False</td>\n", | |
" <td>474</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>2</th>\n", | |
" <td>None</td>\n", | |
" <td>False</td>\n", | |
" <td>234</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>3</th>\n", | |
" <td>True</td>\n", | |
" <td>True</td>\n", | |
" <td>1</td>\n", | |
" </tr>\n", | |
" </tbody>\n", | |
"</table>\n", | |
"</div>" | |
], | |
"text/plain": [ | |
" grouping1 grouping2 count\n", | |
"0 False False 897\n", | |
"1 True False 474\n", | |
"2 None False 234\n", | |
"3 True True 1" | |
] | |
}, | |
"execution_count": 16, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"result = aggregation.select(grouping1_value.alias(\"grouping1\"),\n", | |
" grouping2_value.alias(\"grouping2\"),\n", | |
" aggregation_value.alias(\"count\")).distinct()\n", | |
"result.toPandas()" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 17, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/plain": [ | |
"13.59262752532959" | |
] | |
}, | |
"execution_count": 17, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"time.time() - start_time" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "Python 3", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.8.5" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 4 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment