Skip to content

Instantly share code, notes, and snippets.

@johngrimes
Last active November 15, 2020 08:02
Show Gist options
  • Save johngrimes/425119410742e71b020e4f2ccd657f5d to your computer and use it in GitHub Desktop.
Save johngrimes/425119410742e71b020e4f2ccd657f5d to your computer and use it in GitHub Desktop.
Testing windowing as a substitute for straight aggregation
Display the source blob
Display the rendered blob
Raw
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Windowing as a substitute for straight aggregation\n",
"\n",
"This notebook will simulate the execution of the following query to the [aggregate operation](https://pathling.csiro.au/docs/aggregate.html) in [Pathling](https://pathling.csiro.au):\n",
"\n",
"```\n",
"subjectResource: Patient\n",
"aggregation: count()\n",
"grouping: name.prefix contains 'Mrs.'\n",
"grouping: name.given contains 'Karina848'\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql import SparkSession\n",
"from pyspark.sql.functions import *\n",
"from pyspark.sql import Window\n",
"import time\n",
"\n",
"spark = SparkSession.builder \\\n",
" .master('local[*]') \\\n",
" .config(\"spark.driver.memory\", \"2g\") \\\n",
" .config(\"spark.executor.memory\", \"6g\") \\\n",
" .config(\"spark.sql.autoBroadcastJoinThreshold\", \"-1\") \\\n",
" .config(\"spark.sql.adaptive.enabled\", \"true\") \\\n",
" .config(\"spark.sql.shuffle.service.enabled\", \"true\") \\\n",
" .config(\"spark.sql.shuffle.partitions\", \"2\") \\\n",
" .config(\"spark.scheduler.mode\", \"FAIR\") \\\n",
" .config(\"spark.dynamicAllocation.enabled\", \"true\") \\\n",
" .getOrCreate()"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"start_time = time.time()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Grouping 1"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"First we get the `Patient` dataset from file."
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"patient = spark.read.parquet(\"file:///usr/share/data/synthea/1k_20190829/parquet/Patient.parquet\")\n",
"patient_id = patient[\"id\"]\n",
"patient_columns = [ patient[col] for col in patient.columns ]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Traverse to the `name` element within `Patient`."
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"grouping1 = patient.withColumn(\"a\", patient[\"id\"])\n",
"grouping1 = grouping1.withColumn(\"b\", explode_outer(patient[\"name\"]))\n",
"grouping1_id = grouping1[\"a\"]\n",
"grouping1_value = grouping1[\"b\"]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Traverse to the `prefix` element within `name`."
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"grouping1 = grouping1.withColumn(\"c\", explode_outer(grouping1_value.getField(\"prefix\")))\n",
"grouping1_value = grouping1[\"c\"]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create a dataset to represent the literal value `Mrs.`."
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"mrs_literal = patient.select(patient_id.alias(\"d\"), lit(\"Mrs.\").alias(\"e\"))\n",
"mrs_literal_id = mrs_literal[\"d\"]\n",
"mrs_literal_value = mrs_literal[\"e\"]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create a column which calculates the result of a `contains` operation, with `prefix` on the left and `Mrs.` on the right."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql.functions import max as spark_max\n",
"\n",
"grouping1 = grouping1.join(mrs_literal, grouping1_id == mrs_literal_id, \"left_outer\")\n",
"equality_with_null_checks = when(grouping1_value.isNull(), lit(None)) \\\n",
" .when(mrs_literal_value.isNull(), lit(None)) \\\n",
" .otherwise(grouping1_value == mrs_literal_value)\n",
"agg_column = spark_max(equality_with_null_checks)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Use __aggregation__ to get the result of the `contains` operation."
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"grouping1 = grouping1.groupBy(patient_columns).agg(agg_column)\n",
"grouping1_id = grouping1[grouping1.columns[0]]\n",
"grouping1_value = grouping1[grouping1.columns[-1]]\n",
"patient_id = grouping1_id\n",
"patient_columns = [ grouping1[col] for col in grouping1.columns[0:(len(patient_columns) - 1)] ]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Grouping 2"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Traverse to the `name` element within `Patient`."
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"grouping2 = grouping1.withColumn(\"g\", grouping1_id)\n",
"grouping2 = grouping2.withColumn(\"h\", explode_outer(patient[\"name\"]))\n",
"grouping2_id = grouping2[\"g\"]\n",
"grouping2_value = grouping2[\"h\"]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Traverse to the `given` element within `name`."
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"grouping2 = grouping2.withColumn(\"i\", explode_outer(grouping2_value.getField(\"given\")))\n",
"grouping2_value = grouping2[\"i\"]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create a dataset to represent the literal value `Karina848`."
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"karina_literal = patient.select(patient_id.alias(\"j\"), lit(\"Karina848\").alias(\"k\"))\n",
"karina_literal_id = karina_literal[\"j\"]\n",
"karina_literal_value = karina_literal[\"k\"]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create a column which calculates the result of a `contains` operation, with `given` on the left and `Karina848` on the right."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"from pyspark.sql.functions import max as spark_max\n",
"\n",
"grouping2 = grouping2.join(karina_literal, grouping2_id == karina_literal_id, \"left_outer\")\n",
"equality_with_null_checks = when(grouping2_value.isNull(), lit(None)) \\\n",
" .when(karina_literal_value.isNull(), lit(None)) \\\n",
" .otherwise(grouping2_value == karina_literal_value)\n",
"agg_column = spark_max(equality_with_null_checks)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Use __aggregation__ to get the result of the `contains` operation."
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
"group_by = patient_columns.append(grouping1_value)\n",
"grouping2 = grouping2.groupBy(patient_columns).agg(agg_column)\n",
"grouping2_id = grouping2[grouping2.columns[0]]\n",
"grouping2_value = grouping2[grouping2.columns[-1]]\n",
"patient_id = grouping2_id\n",
"patient_columns = [ grouping2[col] for col in grouping2.columns[0:(len(patient_columns) - 1)] ]\n",
"grouping1_value = grouping2[grouping2.columns[len(patient_columns)]]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Aggregation"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create a column which counts the number of patients within each grouping."
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"grouping_columns = [grouping1_value, grouping2_value]\n",
"count_column = count(patient_id)\n",
"null_aware_count = when(count_column.isNull(), 0) \\\n",
" .otherwise(count_column)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Create a dataset to represent the aggregation."
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"aggregation = grouping2.groupBy(grouping_columns).agg(null_aware_count)\n",
"aggregation_value = aggregation[aggregation.columns[-1]]"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Final result"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Take the distinct values from the selection of each of the grouping columns, and the aggregation column."
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>grouping1</th>\n",
" <th>grouping2</th>\n",
" <th>count</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>False</td>\n",
" <td>False</td>\n",
" <td>897</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>True</td>\n",
" <td>False</td>\n",
" <td>474</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>None</td>\n",
" <td>False</td>\n",
" <td>234</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>True</td>\n",
" <td>True</td>\n",
" <td>1</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" grouping1 grouping2 count\n",
"0 False False 897\n",
"1 True False 474\n",
"2 None False 234\n",
"3 True True 1"
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"result = aggregation.select(grouping1_value.alias(\"grouping1\"),\n",
" grouping2_value.alias(\"grouping2\"),\n",
" aggregation_value.alias(\"count\")).distinct()\n",
"result.toPandas()"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"13.59262752532959"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"time.time() - start_time"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.8.5"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment