Skip to content

Instantly share code, notes, and snippets.

@jamesbroome
Last active April 30, 2021 13:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jamesbroome/37876a5f23feb0595bdd1d075cc10712 to your computer and use it in GitHub Desktop.
Save jamesbroome/37876a5f23feb0595bdd1d075cc10712 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "markdown",
"source": [
"# Testing ETL processes in Synapse Notebook Demo"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"from pyspark.sql.functions import *\r\n",
"import pytest"
],
"outputs": [],
"execution_count": 396,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "code",
"source": [
"test_mode = True"
],
"outputs": [],
"execution_count": 397,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true,
"tags": [
"parameters"
]
}
},
{
"cell_type": "markdown",
"source": [
"## ETL Functions"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"orders_schema = [\"OrderId\",\"OrderDate\", \"Region\", \"City\", \"Category\",\"Product\",\"Quantity\",\"UnitPrice\",\"TotalPrice\"]"
],
"outputs": [],
"execution_count": 398,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "code",
"source": [
"def load_data():\r\n",
" # N.B. this would actually be loading the real data from somewhere in the data lake, or elsewhere...\r\n",
" df = spark.createDataFrame(\r\n",
" [\r\n",
" (1,\"01/01/2020\",\"East\",\"Boston\",\"Bars\",\"Carrot\",33,1.77,58.41),\r\n",
" (2,\"04/01/2020\",\"East\",\"Boston\",\"Crackers\",\"Whole Wheat\",87,3.49,303.63),\r\n",
" (3,\"07/01/2020\",\"West\",\"Los Angeles\",\"Cookies\",\"Chocolate Chip\",58,1.87,108.46),\r\n",
" (3,\"07/01/2020\",\"West\",\"Los Angeles\",\"Cookies\",\"Chocolate Chip\",58,1.87,108.46),\r\n",
" (4,\"10/01/2020\",\"East\",\"New York\",\"Cookies\",\"Chocolate Chip\",82,1.87,153.34),\r\n",
" (5,\"13/01/2020\",\"East\",\"Boston\",\"Cookies\",\"Arrowroot\",38,2.18,82.84),\r\n",
" (6,\"16/01/2020\",\"East\",\"Boston\",\"Bars\",\"Carrot\",54,1.77,95.58),\r\n",
" (7,\"19/01/2020\",\"East\",\"Boston\",\"Crackers\",\"Whole Wheat\",149,3.49,520.01),\r\n",
" (7,\"19/01/2020\",\"East\",\"Boston\",\"Crackers\",\"Whole Wheat\",149,3.49,520.01),\r\n",
" (8,\"22/01/2020\",\"West\",\"Los Angeles\",\"Bars\",\"Carrot\",51,1.77,90.27),\r\n",
" (9,\"25/01/2020\",\"East\",\"New York\",\"Bars\",\"Carrot\",100,1.77,177.00),\r\n",
" (10,\"28/01/2020\",\"East\",\"New York\",\"Snacks\",\"Potato Chips\",28,1.35,37.8),\r\n",
" (11,\"31/01/2020\",\"East\",\"Boston\",\"Cookies\",\"Arrowroot\",36,2.18,78.48),\r\n",
" (12,\"03/02/2020\",\"East\",\"Boston\",\"Cookies\",\"Chocolate Chip\",31,1.87,57.97),\r\n",
" (13,\"06/02/2020\",\"East\",\"Boston\",\"Crackers\",\"Whole Wheat\",28,3.49,97.72) \r\n",
" ],\r\n",
" orders_schema \r\n",
" )\r\n",
" return df\r\n"
],
"outputs": [],
"execution_count": 399,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "code",
"source": [
"def save_data(df):\r\n",
" # N.B. this would actually be writing the data back to the data lake, or elsewhere...\r\n",
" df.show()"
],
"outputs": [],
"execution_count": 400,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "code",
"source": [
"def remove_duplicate_orders(df):\r\n",
" # Swap these lines over to fail test\r\n",
" #return df \r\n",
" #return df.distinct()\r\n",
" return df.dropDuplicates([\"OrderId\"])"
],
"outputs": [],
"execution_count": 401,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "code",
"source": [
"def calculate_sales_by_region(df):\r\n",
" return df.select(\"Region\", \"TotalPrice\").groupBy(\"Region\").sum(\"TotalPrice\").withColumnRenamed(\"sum(TotalPrice)\", \"TotalSales\")\r\n"
],
"outputs": [],
"execution_count": 402,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "markdown",
"source": [
"## Tests"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"def orders_with_duplicated_order_id_are_removed():\r\n",
" \r\n",
" # Arrange\r\n",
" df = spark.createDataFrame(\r\n",
" [\r\n",
" (10,\"01/01/2020\",\"North\",\"Chicago\",\"Bars\",\"Carrot\",33,1.77,58.41),\r\n",
" (10,\"11/03/2020\",\"North\",\"Chicago\",\"Bars\",\"Carrot\",33,1.77,58.41),\r\n",
" ],\r\n",
" orders_schema \r\n",
" )\r\n",
"\r\n",
" #Act\r\n",
" df_result = remove_duplicate_orders(df)\r\n",
"\r\n",
" #Assert\r\n",
" assert df_result, \"No data frame returned from remove_duplicate_orders()\"\r\n",
"\r\n",
" expected_orders = 1\r\n",
" number_of_orders = df_result.count()\r\n",
" assert number_of_orders == 1, f'Expected {expected_orders} order after remove_duplicate_orders() but {number_of_orders} returned.'"
],
"outputs": [],
"execution_count": 403,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "code",
"source": [
"def similar_orders_with_different_order_id_are_not_removed():\r\n",
" \r\n",
" # Arrange\r\n",
" df = spark.createDataFrame(\r\n",
" [\r\n",
" (10,\"01/01/2020\",\"North\",\"Chicago\",\"Bars\",\"Carrot\",33,1.77,58.41),\r\n",
" (11,\"01/01/2020\",\"North\",\"Chicago\",\"Bars\",\"Carrot\",33,1.77,58.41),\r\n",
" (12,\"01/01/2020\",\"North\",\"Chicago\",\"Bars\",\"Carrot\",33,1.77,58.41),\r\n",
" ],\r\n",
" orders_schema \r\n",
" )\r\n",
"\r\n",
" #Act\r\n",
" df_result = remove_duplicate_orders(df)\r\n",
"\r\n",
" #Assert\r\n",
" expected_orders = 3\r\n",
" number_of_orders = df_result.count()\r\n",
" assert number_of_orders == 3, f'Expected {expected_orders} order after remove_duplicate_orders() but {number_of_orders} returned.'"
],
"outputs": [],
"execution_count": 404,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "code",
"source": [
"def regional_sales_are_calculated_correctly():\r\n",
"\r\n",
" # Arrange\r\n",
" df = spark.createDataFrame(\r\n",
" [\r\n",
" (7,\"19/01/2020\",\"East\",\"Boston\",\"Crackers\",\"Whole Wheat\",149,3.49,520.01),\r\n",
" (8,\"22/01/2020\",\"West\",\"Los Angeles\",\"Bars\",\"Carrot\",51,1.77,90.27),\r\n",
" (9,\"25/01/2020\",\"East\",\"New York\",\"Bars\",\"Carrot\",100,1.77,177.00),\r\n",
" (10,\"28/01/2020\",\"East\",\"New York\",\"Snacks\",\"Potato Chips\",28,1.35,37.8),\r\n",
" ],\r\n",
" orders_schema \r\n",
" )\r\n",
" \r\n",
" #Act\r\n",
" df_result = calculate_sales_by_region(df)\r\n",
"\r\n",
" #Assert\r\n",
" expected_sales_east = 734.81\r\n",
" sales_east = df_result.where(df_result[\"Region\"] == \"East\").collect()[0][\"TotalSales\"]\r\n",
"\r\n",
" assert expected_sales_east == sales_east, f'Expected regional sales to be {expected_sales_east} for East region but {sales_east} returned.'"
],
"outputs": [],
"execution_count": 405,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "code",
"source": [
"def run_tests():\r\n",
" orders_with_duplicated_order_id_are_removed()\r\n",
" similar_orders_with_different_order_id_are_not_removed()\r\n",
" regional_sales_are_calculated_correctly()"
],
"outputs": [],
"execution_count": 406,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
},
{
"cell_type": "markdown",
"source": [
"## ETL Workflow"
],
"metadata": {
"nteract": {
"transient": {
"deleting": false
}
}
}
},
{
"cell_type": "code",
"source": [
"if test_mode:\r\n",
" run_tests()\r\n",
"else:\r\n",
" # Input\r\n",
" df_raw = load_data()\r\n",
"\r\n",
" # ETL Steps\r\n",
" df_distinct_orders = remove_duplicate_orders(df_raw)\r\n",
" df_sales_by_region = calculate_sales_by_region(df_distinct_orders)\r\n",
"\r\n",
" # Output\r\n",
" save_data(df_sales_by_region)"
],
"outputs": [],
"execution_count": 407,
"metadata": {
"jupyter": {
"source_hidden": false,
"outputs_hidden": false
},
"nteract": {
"transient": {
"deleting": false
}
},
"collapsed": true
}
}
],
"metadata": {
"kernelspec": {
"name": "synapse_pyspark",
"display_name": "python"
},
"language_info": {
"name": "python"
},
"save_output": true,
"synapse_widget": {
"version": "0.1",
"state": {}
}
},
"nbformat": 4,
"nbformat_minor": 2
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment