Skip to content

Instantly share code, notes, and snippets.

@philerooski
Last active September 21, 2023 22:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save philerooski/49d1c47baecc74582d36c63783c58b3b to your computer and use it in GitHub Desktop.
Save philerooski/49d1c47baecc74582d36c63783c58b3b to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import great_expectations as gx\n",
"from pyarrow import parquet, fs, dataset\n",
"import sys"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"NAMESPACE=\"etl-533\"\n",
"DATATYPE=\"fitbitintradaycombined\""
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"context = gx.get_context()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Reading Parquet Data"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Data Asset from S3 Parquet"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"# S3 datasource\n",
"s3_datasource = context.sources.add_pandas_s3(\n",
" name=\"recover-dev-processed-data-data-source\",\n",
" bucket=\"recover-dev-processed-data\"\n",
")\n",
"s3_data_asset = s3_datasource.add_parquet_asset(\n",
" name=\"parquet_data_asset\",\n",
" s3_prefix=f\"{NAMESPACE}/parquet/dataset_{DATATYPE}/\",\n",
" s3_recursive_file_discovery=True\n",
")\n",
"batch_request = s3_data_asset.build_batch_request()\n",
"batches = s3_data_asset.get_batch_list_from_batch_request(batch_request=batch_request)"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"# of batches 6\n",
"{'path': 's3a://recover-dev-processed-data/etl-533/parquet/dataset_fitbitintradaycombined/cohort=adults_v1/part-00000-70ddc804-5378-44a1-a72b-891f38fd67ae.c000.snappy.parquet', 'reader_method': 'read_parquet', 'reader_options': {}}\n",
"{'path': 's3a://recover-dev-processed-data/etl-533/parquet/dataset_fitbitintradaycombined/cohort=adults_v1/part-00001-70ddc804-5378-44a1-a72b-891f38fd67ae.c000.snappy.parquet', 'reader_method': 'read_parquet', 'reader_options': {}}\n",
"{'path': 's3a://recover-dev-processed-data/etl-533/parquet/dataset_fitbitintradaycombined/cohort=adults_v1/part-00002-70ddc804-5378-44a1-a72b-891f38fd67ae.c000.snappy.parquet', 'reader_method': 'read_parquet', 'reader_options': {}}\n",
"{'path': 's3a://recover-dev-processed-data/etl-533/parquet/dataset_fitbitintradaycombined/cohort=pediatric_v1/part-00000-70ddc804-5378-44a1-a72b-891f38fd67ae.c000.snappy.parquet', 'reader_method': 'read_parquet', 'reader_options': {}}\n",
"{'path': 's3a://recover-dev-processed-data/etl-533/parquet/dataset_fitbitintradaycombined/cohort=pediatric_v1/part-00001-70ddc804-5378-44a1-a72b-891f38fd67ae.c000.snappy.parquet', 'reader_method': 'read_parquet', 'reader_options': {}}\n",
"{'path': 's3a://recover-dev-processed-data/etl-533/parquet/dataset_fitbitintradaycombined/cohort=pediatric_v1/part-00002-70ddc804-5378-44a1-a72b-891f38fd67ae.c000.snappy.parquet', 'reader_method': 'read_parquet', 'reader_options': {}}\n"
]
}
],
"source": [
"print(f\"# of batches {len(batches)}\")\n",
"for batch in batches:\n",
" print(batch.batch_spec)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Data Asset from in-memory Dataframe"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"# Dataframe datasource\n",
"s3_fs = fs.S3FileSystem()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"ℹ️ Reading all data into a pyarrow table is not sustainable"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"# Memory ineffiecient in-memory table\n",
"table = parquet.read_table(\n",
" f\"recover-dev-processed-data/{NAMESPACE}/parquet/dataset_{DATATYPE}/\",\n",
" filesystem=s3_fs\n",
")\n",
"table_df = table.to_pandas()"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"<class 'pyarrow.lib.Table'>\n",
"501048719\n"
]
}
],
"source": [
"print(type(table))\n",
"print(sys.getsizeof(table))"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"<class 'pandas.core.frame.DataFrame'>\n",
"1905267196\n"
]
}
],
"source": [
"print(type(table_df))\n",
"print(sys.getsizeof(table_df))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Data asset from out-of-memory DataFrame"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"# Memory efficient dataset\n",
"dataset = dataset.dataset(\n",
" f\"recover-dev-processed-data/{NAMESPACE}/parquet/dataset_{DATATYPE}/\",\n",
" filesystem=s3_fs,\n",
" format=\"parquet\",\n",
" partitioning=\"hive\"\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"ℹ️ Dataset does not exist in-memory"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"<class 'pyarrow._dataset.FileSystemDataset'>\n",
"88\n"
]
}
],
"source": [
"print(type(dataset))\n",
"print(sys.getsizeof(dataset))"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"# Read dataset in batches\n",
"pyarrow_batches = dataset.to_batches()\n",
"pyarrow_batch = next(pyarrow_batches)"
]
},
{
"cell_type": "code",
"execution_count": 13,
"metadata": {
"scrolled": true
},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"<class 'pyarrow.lib.RecordBatch'>\n",
"30520637\n"
]
}
],
"source": [
"print(type(pyarrow_batch))\n",
"print(sys.getsizeof(pyarrow_batch))"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"131072"
]
},
"execution_count": 14,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"pyarrow_batch.num_rows"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Each `RecordBatch` serves up fewer records than any individual snappy.parquet file."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"ℹ️ We can get batches from pyarrow's `FileSystemDataset` class, but Great Expectations won't interface with this class out of the box. If we could write a custom batch request function to get batches from our `FileSystemDataset` that could work. But I didn't have enough time to explore what work would be required for this solution."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Profiling"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"1. Create an _expectation suite_\n",
"2. Create a _validator_ by pairing our batch request with the expectation suite\n",
"4. Add some expectations to the validator\n",
"5. Save the expectation suite\n",
"6. Create and run a checkpoint\n",
"7. View the data docs of this checkpoint run"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"context.add_or_update_expectation_suite(\"my_expectation_suite\")\n",
"validator = context.get_validator(\n",
" batch_request=batch_request,\n",
" expectation_suite_name=\"my_expectation_suite\",\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 16,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "159217524141466eb79316152afa75b5",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Calculating Metrics: 0%| | 0/1 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>ParticipantID</th>\n",
" <th>ParticipantIdentifier</th>\n",
" <th>Type</th>\n",
" <th>DateTime</th>\n",
" <th>Value</th>\n",
" <th>InsertedDate</th>\n",
" <th>export_start_date</th>\n",
" <th>export_end_date</th>\n",
" <th>Level</th>\n",
" <th>Mets</th>\n",
" <th>DeepSleepSummaryBreathRate</th>\n",
" <th>RemSleepSummaryBreathRate</th>\n",
" <th>FullSleepSummaryBreathRate</th>\n",
" <th>LightSleepSummaryBreathRate</th>\n",
" <th>Rmssd</th>\n",
" <th>Coverage</th>\n",
" <th>Hf</th>\n",
" <th>Lf</th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>0</th>\n",
" <td>1c386560-bb87-48ba-85e8-0f1b9e99aa13</td>\n",
" <td>MDH-5678-7532</td>\n",
" <td>activities-calories</td>\n",
" <td>2023-06-25T00:28:00</td>\n",
" <td>0.9047999978065491</td>\n",
" <td>2023-06-26T03:59:27Z</td>\n",
" <td>2023-06-26T00:00:00</td>\n",
" <td>2023-06-27T00:00:00</td>\n",
" <td>0</td>\n",
" <td>10</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" </tr>\n",
" <tr>\n",
" <th>1</th>\n",
" <td>1c386560-bb87-48ba-85e8-0f1b9e99aa13</td>\n",
" <td>MDH-5678-7532</td>\n",
" <td>activities-calories</td>\n",
" <td>2023-06-25T08:26:00</td>\n",
" <td>0.9047999978065491</td>\n",
" <td>2023-06-26T03:59:27Z</td>\n",
" <td>2023-06-26T00:00:00</td>\n",
" <td>2023-06-27T00:00:00</td>\n",
" <td>0</td>\n",
" <td>10</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2</th>\n",
" <td>1c386560-bb87-48ba-85e8-0f1b9e99aa13</td>\n",
" <td>MDH-5678-7532</td>\n",
" <td>activities-calories</td>\n",
" <td>2023-06-25T13:01:00</td>\n",
" <td>0.9047999978065491</td>\n",
" <td>2023-06-26T03:59:27Z</td>\n",
" <td>2023-06-26T00:00:00</td>\n",
" <td>2023-06-27T00:00:00</td>\n",
" <td>0</td>\n",
" <td>10</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" </tr>\n",
" <tr>\n",
" <th>3</th>\n",
" <td>1c386560-bb87-48ba-85e8-0f1b9e99aa13</td>\n",
" <td>MDH-5678-7532</td>\n",
" <td>activities-calories</td>\n",
" <td>2023-06-25T17:03:00</td>\n",
" <td>0.9047999978065491</td>\n",
" <td>2023-06-26T03:59:27Z</td>\n",
" <td>2023-06-26T00:00:00</td>\n",
" <td>2023-06-27T00:00:00</td>\n",
" <td>0</td>\n",
" <td>10</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" </tr>\n",
" <tr>\n",
" <th>4</th>\n",
" <td>1c386560-bb87-48ba-85e8-0f1b9e99aa13</td>\n",
" <td>MDH-5678-7532</td>\n",
" <td>activities-calories</td>\n",
" <td>2023-06-25T19:00:00</td>\n",
" <td>0.9047999978065491</td>\n",
" <td>2023-06-26T03:59:27Z</td>\n",
" <td>2023-06-26T00:00:00</td>\n",
" <td>2023-06-27T00:00:00</td>\n",
" <td>0</td>\n",
" <td>10</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" <td>None</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"</div>"
],
"text/plain": [
" ParticipantID ParticipantIdentifier \\\n",
"0 1c386560-bb87-48ba-85e8-0f1b9e99aa13 MDH-5678-7532 \n",
"1 1c386560-bb87-48ba-85e8-0f1b9e99aa13 MDH-5678-7532 \n",
"2 1c386560-bb87-48ba-85e8-0f1b9e99aa13 MDH-5678-7532 \n",
"3 1c386560-bb87-48ba-85e8-0f1b9e99aa13 MDH-5678-7532 \n",
"4 1c386560-bb87-48ba-85e8-0f1b9e99aa13 MDH-5678-7532 \n",
"\n",
" Type DateTime Value \\\n",
"0 activities-calories 2023-06-25T00:28:00 0.9047999978065491 \n",
"1 activities-calories 2023-06-25T08:26:00 0.9047999978065491 \n",
"2 activities-calories 2023-06-25T13:01:00 0.9047999978065491 \n",
"3 activities-calories 2023-06-25T17:03:00 0.9047999978065491 \n",
"4 activities-calories 2023-06-25T19:00:00 0.9047999978065491 \n",
"\n",
" InsertedDate export_start_date export_end_date Level Mets \\\n",
"0 2023-06-26T03:59:27Z 2023-06-26T00:00:00 2023-06-27T00:00:00 0 10 \n",
"1 2023-06-26T03:59:27Z 2023-06-26T00:00:00 2023-06-27T00:00:00 0 10 \n",
"2 2023-06-26T03:59:27Z 2023-06-26T00:00:00 2023-06-27T00:00:00 0 10 \n",
"3 2023-06-26T03:59:27Z 2023-06-26T00:00:00 2023-06-27T00:00:00 0 10 \n",
"4 2023-06-26T03:59:27Z 2023-06-26T00:00:00 2023-06-27T00:00:00 0 10 \n",
"\n",
" DeepSleepSummaryBreathRate RemSleepSummaryBreathRate \\\n",
"0 None None \n",
"1 None None \n",
"2 None None \n",
"3 None None \n",
"4 None None \n",
"\n",
" FullSleepSummaryBreathRate LightSleepSummaryBreathRate Rmssd Coverage Hf \\\n",
"0 None None None None None \n",
"1 None None None None None \n",
"2 None None None None None \n",
"3 None None None None None \n",
"4 None None None None None \n",
"\n",
" Lf \n",
"0 None \n",
"1 None \n",
"2 None \n",
"3 None \n",
"4 None "
]
},
"execution_count": 16,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"validator.head()"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "dc1a725d1886470aba2f7ba9456d3308",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Calculating Metrics: 0%| | 0/2 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"{\n",
" \"success\": true,\n",
" \"result\": {},\n",
" \"meta\": {},\n",
" \"exception_info\": {\n",
" \"raised_exception\": false,\n",
" \"exception_traceback\": null,\n",
" \"exception_message\": null\n",
" }\n",
"}"
]
},
"execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"validator.expect_column_to_exist(\"Value\")"
]
},
{
"cell_type": "code",
"execution_count": 18,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "3b1f7aee461747188bf71ddaea90a50f",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Calculating Metrics: 0%| | 0/6 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"data": {
"text/plain": [
"{\n",
" \"success\": false,\n",
" \"result\": {\n",
" \"element_count\": 11586,\n",
" \"unexpected_count\": 223,\n",
" \"unexpected_percent\": 1.9247367512515106,\n",
" \"partial_unexpected_list\": [\n",
" null,\n",
" null,\n",
" null,\n",
" null,\n",
" null,\n",
" null,\n",
" null,\n",
" null,\n",
" null,\n",
" null,\n",
" null,\n",
" null,\n",
" null,\n",
" null,\n",
" null,\n",
" null,\n",
" null,\n",
" null,\n",
" null,\n",
" null\n",
" ]\n",
" },\n",
" \"meta\": {},\n",
" \"exception_info\": {\n",
" \"raised_exception\": false,\n",
" \"exception_traceback\": null,\n",
" \"exception_message\": null\n",
" }\n",
"}"
]
},
"execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"validator.expect_column_values_to_not_be_null(\"Value\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"ℹ️ We can't add a failing expectation to our suite \"interactively\", so we add it explicitly below:"
]
},
{
"cell_type": "code",
"execution_count": 19,
"metadata": {},
"outputs": [],
"source": [
"validator.save_expectation_suite()"
]
},
{
"cell_type": "code",
"execution_count": 20,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{\n",
" \"expectation_suite_name\": \"my_expectation_suite\",\n",
" \"ge_cloud_id\": null,\n",
" \"expectations\": [\n",
" {\n",
" \"expectation_type\": \"expect_column_values_to_not_be_null\",\n",
" \"kwargs\": {\n",
" \"column\": \"Value\"\n",
" },\n",
" \"meta\": {}\n",
" },\n",
" {\n",
" \"expectation_type\": \"expect_column_to_exist\",\n",
" \"kwargs\": {\n",
" \"column\": \"Value\"\n",
" },\n",
" \"meta\": {}\n",
" }\n",
" ],\n",
" \"data_asset_type\": null,\n",
" \"meta\": {\n",
" \"great_expectations_version\": \"0.17.17\"\n",
" }\n",
"}"
]
},
"execution_count": 20,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"from great_expectations.core.expectation_configuration import ExpectationConfiguration\n",
"\n",
"existing_expectations = context.get_expectation_suite(\"my_expectation_suite\").expectations\n",
"non_null_expectation = ExpectationConfiguration(\n",
" expectation_type=\"expect_column_values_to_not_be_null\",\n",
" kwargs={\"column\": \"Value\"}\n",
")\n",
"context.add_or_update_expectation_suite(\n",
" expectation_suite_name=\"my_expectation_suite\",\n",
" expectations = [non_null_expectation, *existing_expectations]\n",
")"
]
},
{
"cell_type": "code",
"execution_count": 21,
"metadata": {},
"outputs": [
{
"data": {
"text/plain": [
"{\n",
" \"expectation_suite_name\": \"my_expectation_suite\",\n",
" \"ge_cloud_id\": null,\n",
" \"expectations\": [\n",
" {\n",
" \"expectation_type\": \"expect_column_values_to_not_be_null\",\n",
" \"kwargs\": {\n",
" \"column\": \"Value\"\n",
" },\n",
" \"meta\": {}\n",
" },\n",
" {\n",
" \"expectation_type\": \"expect_column_to_exist\",\n",
" \"kwargs\": {\n",
" \"column\": \"Value\"\n",
" },\n",
" \"meta\": {}\n",
" }\n",
" ],\n",
" \"data_asset_type\": null,\n",
" \"meta\": {\n",
" \"great_expectations_version\": \"0.17.17\"\n",
" }\n",
"}"
]
},
"execution_count": 21,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"context.get_expectation_suite(\"my_expectation_suite\")"
]
},
{
"cell_type": "code",
"execution_count": 22,
"metadata": {},
"outputs": [
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "f997492e001b4e3d849c820c325b9e50",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"Calculating Metrics: 0%| | 0/8 [00:00<?, ?it/s]"
]
},
"metadata": {},
"output_type": "display_data"
}
],
"source": [
"checkpoint = context.add_or_update_checkpoint(\n",
" name=\"my_checkpoint\",\n",
" validator=validator,\n",
")\n",
"checkpoint_result = checkpoint.run()"
]
},
{
"cell_type": "code",
"execution_count": 23,
"metadata": {},
"outputs": [],
"source": [
"context.view_validation_result(checkpoint_result)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.8"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment