-
-
Save jamesbroome/37876a5f23feb0595bdd1d075cc10712 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"# Testing ETL processes in Synapse Notebook Demo" | |
], | |
"metadata": { | |
"nteract": { | |
"transient": { | |
"deleting": false | |
} | |
} | |
} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"from pyspark.sql.functions import *\r\n", | |
"import pytest" | |
], | |
"outputs": [], | |
"execution_count": 396, | |
"metadata": { | |
"jupyter": { | |
"source_hidden": false, | |
"outputs_hidden": false | |
}, | |
"nteract": { | |
"transient": { | |
"deleting": false | |
} | |
}, | |
"collapsed": true | |
} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"test_mode = True" | |
], | |
"outputs": [], | |
"execution_count": 397, | |
"metadata": { | |
"jupyter": { | |
"source_hidden": false, | |
"outputs_hidden": false | |
}, | |
"nteract": { | |
"transient": { | |
"deleting": false | |
} | |
}, | |
"collapsed": true, | |
"tags": [ | |
"parameters" | |
] | |
} | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"## ETL Functions" | |
], | |
"metadata": { | |
"nteract": { | |
"transient": { | |
"deleting": false | |
} | |
} | |
} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"orders_schema = [\"OrderId\",\"OrderDate\", \"Region\", \"City\", \"Category\",\"Product\",\"Quantity\",\"UnitPrice\",\"TotalPrice\"]" | |
], | |
"outputs": [], | |
"execution_count": 398, | |
"metadata": { | |
"jupyter": { | |
"source_hidden": false, | |
"outputs_hidden": false | |
}, | |
"nteract": { | |
"transient": { | |
"deleting": false | |
} | |
}, | |
"collapsed": true | |
} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"def load_data():\r\n", | |
" # N.B. this would actually be loading the real data from somewhere in the data lake, or elsewhere...\r\n", | |
" df = spark.createDataFrame(\r\n", | |
" [\r\n", | |
" (1,\"01/01/2020\",\"East\",\"Boston\",\"Bars\",\"Carrot\",33,1.77,58.41),\r\n", | |
" (2,\"04/01/2020\",\"East\",\"Boston\",\"Crackers\",\"Whole Wheat\",87,3.49,303.63),\r\n", | |
" (3,\"07/01/2020\",\"West\",\"Los Angeles\",\"Cookies\",\"Chocolate Chip\",58,1.87,108.46),\r\n", | |
" (3,\"07/01/2020\",\"West\",\"Los Angeles\",\"Cookies\",\"Chocolate Chip\",58,1.87,108.46),\r\n", | |
" (4,\"10/01/2020\",\"East\",\"New York\",\"Cookies\",\"Chocolate Chip\",82,1.87,153.34),\r\n", | |
" (5,\"13/01/2020\",\"East\",\"Boston\",\"Cookies\",\"Arrowroot\",38,2.18,82.84),\r\n", | |
" (6,\"16/01/2020\",\"East\",\"Boston\",\"Bars\",\"Carrot\",54,1.77,95.58),\r\n", | |
" (7,\"19/01/2020\",\"East\",\"Boston\",\"Crackers\",\"Whole Wheat\",149,3.49,520.01),\r\n", | |
" (7,\"19/01/2020\",\"East\",\"Boston\",\"Crackers\",\"Whole Wheat\",149,3.49,520.01),\r\n", | |
" (8,\"22/01/2020\",\"West\",\"Los Angeles\",\"Bars\",\"Carrot\",51,1.77,90.27),\r\n", | |
" (9,\"25/01/2020\",\"East\",\"New York\",\"Bars\",\"Carrot\",100,1.77,177.00),\r\n", | |
" (10,\"28/01/2020\",\"East\",\"New York\",\"Snacks\",\"Potato Chips\",28,1.35,37.8),\r\n", | |
" (11,\"31/01/2020\",\"East\",\"Boston\",\"Cookies\",\"Arrowroot\",36,2.18,78.48),\r\n", | |
" (12,\"03/02/2020\",\"East\",\"Boston\",\"Cookies\",\"Chocolate Chip\",31,1.87,57.97),\r\n", | |
" (13,\"06/02/2020\",\"East\",\"Boston\",\"Crackers\",\"Whole Wheat\",28,3.49,97.72) \r\n", | |
" ],\r\n", | |
" orders_schema \r\n", | |
" )\r\n", | |
" return df\r\n" | |
], | |
"outputs": [], | |
"execution_count": 399, | |
"metadata": { | |
"jupyter": { | |
"source_hidden": false, | |
"outputs_hidden": false | |
}, | |
"nteract": { | |
"transient": { | |
"deleting": false | |
} | |
}, | |
"collapsed": true | |
} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"def save_data(df):\r\n", | |
" # N.B. this would actually be writing the data back to the data lake, or elsewhere...\r\n", | |
" df.show()" | |
], | |
"outputs": [], | |
"execution_count": 400, | |
"metadata": { | |
"jupyter": { | |
"source_hidden": false, | |
"outputs_hidden": false | |
}, | |
"nteract": { | |
"transient": { | |
"deleting": false | |
} | |
}, | |
"collapsed": true | |
} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"def remove_duplicate_orders(df):\r\n", | |
" # Swap these lines over to fail test\r\n", | |
" #return df \r\n", | |
" #return df.distinct()\r\n", | |
" return df.dropDuplicates([\"OrderId\"])" | |
], | |
"outputs": [], | |
"execution_count": 401, | |
"metadata": { | |
"jupyter": { | |
"source_hidden": false, | |
"outputs_hidden": false | |
}, | |
"nteract": { | |
"transient": { | |
"deleting": false | |
} | |
}, | |
"collapsed": true | |
} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"def calculate_sales_by_region(df):\r\n", | |
" return df.select(\"Region\", \"TotalPrice\").groupBy(\"Region\").sum(\"TotalPrice\").withColumnRenamed(\"sum(TotalPrice)\", \"TotalSales\")\r\n" | |
], | |
"outputs": [], | |
"execution_count": 402, | |
"metadata": { | |
"jupyter": { | |
"source_hidden": false, | |
"outputs_hidden": false | |
}, | |
"nteract": { | |
"transient": { | |
"deleting": false | |
} | |
}, | |
"collapsed": true | |
} | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"## Tests" | |
], | |
"metadata": { | |
"nteract": { | |
"transient": { | |
"deleting": false | |
} | |
} | |
} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"def orders_with_duplicated_order_id_are_removed():\r\n", | |
" \r\n", | |
" # Arrange\r\n", | |
" df = spark.createDataFrame(\r\n", | |
" [\r\n", | |
" (10,\"01/01/2020\",\"North\",\"Chicago\",\"Bars\",\"Carrot\",33,1.77,58.41),\r\n", | |
" (10,\"11/03/2020\",\"North\",\"Chicago\",\"Bars\",\"Carrot\",33,1.77,58.41),\r\n", | |
" ],\r\n", | |
" orders_schema \r\n", | |
" )\r\n", | |
"\r\n", | |
" #Act\r\n", | |
" df_result = remove_duplicate_orders(df)\r\n", | |
"\r\n", | |
" #Assert\r\n", | |
" assert df_result, \"No data frame returned from remove_duplicate_orders()\"\r\n", | |
"\r\n", | |
" expected_orders = 1\r\n", | |
" number_of_orders = df_result.count()\r\n", | |
" assert number_of_orders == 1, f'Expected {expected_orders} order after remove_duplicate_orders() but {number_of_orders} returned.'" | |
], | |
"outputs": [], | |
"execution_count": 403, | |
"metadata": { | |
"jupyter": { | |
"source_hidden": false, | |
"outputs_hidden": false | |
}, | |
"nteract": { | |
"transient": { | |
"deleting": false | |
} | |
}, | |
"collapsed": true | |
} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"def similar_orders_with_different_order_id_are_not_removed():\r\n", | |
" \r\n", | |
" # Arrange\r\n", | |
" df = spark.createDataFrame(\r\n", | |
" [\r\n", | |
" (10,\"01/01/2020\",\"North\",\"Chicago\",\"Bars\",\"Carrot\",33,1.77,58.41),\r\n", | |
" (11,\"01/01/2020\",\"North\",\"Chicago\",\"Bars\",\"Carrot\",33,1.77,58.41),\r\n", | |
" (12,\"01/01/2020\",\"North\",\"Chicago\",\"Bars\",\"Carrot\",33,1.77,58.41),\r\n", | |
" ],\r\n", | |
" orders_schema \r\n", | |
" )\r\n", | |
"\r\n", | |
" #Act\r\n", | |
" df_result = remove_duplicate_orders(df)\r\n", | |
"\r\n", | |
" #Assert\r\n", | |
" expected_orders = 3\r\n", | |
" number_of_orders = df_result.count()\r\n", | |
" assert number_of_orders == 3, f'Expected {expected_orders} order after remove_duplicate_orders() but {number_of_orders} returned.'" | |
], | |
"outputs": [], | |
"execution_count": 404, | |
"metadata": { | |
"jupyter": { | |
"source_hidden": false, | |
"outputs_hidden": false | |
}, | |
"nteract": { | |
"transient": { | |
"deleting": false | |
} | |
}, | |
"collapsed": true | |
} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"def regional_sales_are_calculated_correctly():\r\n", | |
"\r\n", | |
" # Arrange\r\n", | |
" df = spark.createDataFrame(\r\n", | |
" [\r\n", | |
" (7,\"19/01/2020\",\"East\",\"Boston\",\"Crackers\",\"Whole Wheat\",149,3.49,520.01),\r\n", | |
" (8,\"22/01/2020\",\"West\",\"Los Angeles\",\"Bars\",\"Carrot\",51,1.77,90.27),\r\n", | |
" (9,\"25/01/2020\",\"East\",\"New York\",\"Bars\",\"Carrot\",100,1.77,177.00),\r\n", | |
" (10,\"28/01/2020\",\"East\",\"New York\",\"Snacks\",\"Potato Chips\",28,1.35,37.8),\r\n", | |
" ],\r\n", | |
" orders_schema \r\n", | |
" )\r\n", | |
" \r\n", | |
" #Act\r\n", | |
" df_result = calculate_sales_by_region(df)\r\n", | |
"\r\n", | |
" #Assert\r\n", | |
" expected_sales_east = 734.81\r\n", | |
" sales_east = df_result.where(df_result[\"Region\"] == \"East\").collect()[0][\"TotalSales\"]\r\n", | |
"\r\n", | |
" assert expected_sales_east == sales_east, f'Expected regional sales to be {expected_sales_east} for East region but {sales_east} returned.'" | |
], | |
"outputs": [], | |
"execution_count": 405, | |
"metadata": { | |
"jupyter": { | |
"source_hidden": false, | |
"outputs_hidden": false | |
}, | |
"nteract": { | |
"transient": { | |
"deleting": false | |
} | |
}, | |
"collapsed": true | |
} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"def run_tests():\r\n", | |
" orders_with_duplicated_order_id_are_removed()\r\n", | |
" similar_orders_with_different_order_id_are_not_removed()\r\n", | |
" regional_sales_are_calculated_correctly()" | |
], | |
"outputs": [], | |
"execution_count": 406, | |
"metadata": { | |
"jupyter": { | |
"source_hidden": false, | |
"outputs_hidden": false | |
}, | |
"nteract": { | |
"transient": { | |
"deleting": false | |
} | |
}, | |
"collapsed": true | |
} | |
}, | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"## ETL Workflow" | |
], | |
"metadata": { | |
"nteract": { | |
"transient": { | |
"deleting": false | |
} | |
} | |
} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"if test_mode:\r\n", | |
" run_tests()\r\n", | |
"else:\r\n", | |
" # Input\r\n", | |
" df_raw = load_data()\r\n", | |
"\r\n", | |
" # ETL Steps\r\n", | |
" df_distinct_orders = remove_duplicate_orders(df_raw)\r\n", | |
" df_sales_by_region = calculate_sales_by_region(df_distinct_orders)\r\n", | |
"\r\n", | |
" # Output\r\n", | |
" save_data(df_sales_by_region)" | |
], | |
"outputs": [], | |
"execution_count": 407, | |
"metadata": { | |
"jupyter": { | |
"source_hidden": false, | |
"outputs_hidden": false | |
}, | |
"nteract": { | |
"transient": { | |
"deleting": false | |
} | |
}, | |
"collapsed": true | |
} | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"name": "synapse_pyspark", | |
"display_name": "python" | |
}, | |
"language_info": { | |
"name": "python" | |
}, | |
"save_output": true, | |
"synapse_widget": { | |
"version": "0.1", | |
"state": {} | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment