-
-
Save staskh/f25b52f97f6775d96992f9785b0e2019 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"attachments": {}, | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Lake Formation performance test\n", | |
"This is a comparison test for reading 104x1000 table using direct S3 access, Athena query and Lake Formation query APIs" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 1, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"import awswrangler as wr \n", | |
"import pandas as pd\n", | |
"from datetime import datetime\n", | |
"import boto3 \n", | |
"\n", | |
"DATABASE = \"lakeformation_performance_test\"\n", | |
"TABLE = \"performance_test_table\"\n", | |
"REGION = boto3.Session().region_name\n", | |
"account_id = wr.sts.get_account_id()\n", | |
"\n", | |
"BUCKET = f\"lakeformation-performance-test-{account_id}-{REGION}\"\n", | |
"DATABASE_LOCATION = f's3://{BUCKET}/{DATABASE}/'\n", | |
"TABLE_LOCATION = f's3://{BUCKET}/{DATABASE}/{TABLE}/'\n" | |
] | |
}, | |
{ | |
"attachments": {}, | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Create a clean database and test table" | |
] | |
}, | |
{ | |
"attachments": {}, | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Create a bucket if not exist" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 2, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"S3 bucket lakeformation-performance-test-846535425765-us-east-1 already exists in us-east-1.\n" | |
] | |
} | |
], | |
"source": [ | |
"# create bucket if not exists \n", | |
"# create an S3 resource\n", | |
"s3 = boto3.resource('s3', region_name=REGION)\n", | |
"\n", | |
"# check if the bucket already exists\n", | |
"bucket_exists = s3.Bucket(BUCKET) in s3.buckets.all()\n", | |
"\n", | |
"# create the bucket if it does not exist\n", | |
"if not bucket_exists:\n", | |
" s3.create_bucket(Bucket=BUCKET)\n", | |
" print(f'S3 bucket {BUCKET} created in {REGION}.')\n", | |
"else:\n", | |
" print(f'S3 bucket {BUCKET} already exists in {REGION}.')\n", | |
"\n", | |
"# register bucket with lakeformation if not registered \n", | |
"resource_arn = f'arn:aws:s3:::{BUCKET}'\n" | |
] | |
}, | |
{ | |
"attachments": {}, | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Register bucket with Lake Formation" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 3, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"S3 bucket lakeformation-performance-test-846535425765-us-east-1 already registered with lakeformation.\n" | |
] | |
} | |
], | |
"source": [ | |
"\n", | |
"# create a lakeformation client\n", | |
"client = boto3.client('lakeformation', region_name=REGION)\n", | |
"\n", | |
"# check of the bucket is already registered\n", | |
"registered = client.list_resources(\n", | |
" FilterConditionList=[\n", | |
" {\n", | |
" 'ComparisonOperator': 'EQ',\n", | |
" 'Field': 'RESOURCE_ARN',\n", | |
" 'StringValueList': [resource_arn]\n", | |
" }\n", | |
" ]\n", | |
")['ResourceInfoList']\n", | |
"\n", | |
"# register the bucket with lakeformation if it is not already registered\n", | |
"if not registered:\n", | |
" # register the bucket with lakeformation\n", | |
" response = client.register_resource(\n", | |
" ResourceArn=resource_arn,\n", | |
" UseServiceLinkedRole=True\n", | |
" )\n", | |
" print(f'S3 bucket {BUCKET} registered with lakeformation.')\n", | |
"else:\n", | |
" print(f'S3 bucket {BUCKET} already registered with lakeformation.')\n", | |
"\n" | |
] | |
}, | |
{ | |
"attachments": {}, | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Create database if not exists" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 4, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Database lakeformation_performance_test already exists in Lake Formation.\n" | |
] | |
} | |
], | |
"source": [ | |
"# Get a list of all databases in the AWS Glue Data Catalog\n", | |
"databases = wr.catalog.databases()\n", | |
"\n", | |
"# check if the database exists\n", | |
"if not DATABASE in list(databases['Database']):\n", | |
" # create the database in Lake Formation\n", | |
" wr.catalog.create_database(\n", | |
" name=DATABASE,\n", | |
" description='Database for testing lakeformation',\n", | |
" database_input_args = {\n", | |
" \"LocationUri\" : DATABASE_LOCATION\n", | |
" }\n", | |
" )\n", | |
" \n", | |
" print(f'Database {DATABASE} created in Lake Formation.')\n", | |
"else:\n", | |
" print(f'Database {DATABASE} already exists in Lake Formation.')\n" | |
] | |
}, | |
{ | |
"attachments": {}, | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Clean up table (if exists) and all parquet files in teh table location" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 5, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"\n", | |
"# delete table if exists\n", | |
"wr.catalog.delete_table_if_exists(database=DATABASE, table=TABLE)\n", | |
"\n", | |
"# clean up S3 location\n", | |
"wr.s3.delete_objects(TABLE_LOCATION)" | |
] | |
}, | |
{ | |
"attachments": {}, | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"## Create a test table in the database" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 6, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Table performance_test_table created successfully.\n" | |
] | |
} | |
], | |
"source": [ | |
"import time\n", | |
"glue_client = boto3.client('glue')\n", | |
"\n", | |
"\n", | |
"n_confidential = 2\n", | |
"n_non_confidential = 100 # create non confidential data\n", | |
"# create a dictionary for columns types\n", | |
"columns_types = {}\n", | |
"for i in range(n_confidential):\n", | |
" columns_types[f'Confidential{i+1}'] = 'string'\n", | |
"for i in range(n_non_confidential):\n", | |
" columns_types[f'NonConfidential{i+1}'] = 'string'\n", | |
"columns_types['created_at'] = 'timestamp'\n", | |
"\n", | |
"tables = wr.catalog.tables(database=DATABASE)\n", | |
"if TABLE not in list(tables['Table']):\n", | |
" wr.catalog.create_parquet_table(\n", | |
" database=DATABASE,\n", | |
" path=TABLE_LOCATION,\n", | |
" table=TABLE,\n", | |
" partitions_types = {\n", | |
" 'cohort': 'string'\n", | |
" },\n", | |
" columns_types=columns_types,\n", | |
" description='Table for testing lakeformation',\n", | |
" table_type = \"GOVERNED\",\n", | |
" compression= 'snappy'\n", | |
" )\n", | |
"\n", | |
" # Wait for the table to be created\n", | |
" while True:\n", | |
" try:\n", | |
" response = glue_client.get_table(DatabaseName=DATABASE, Name=TABLE)\n", | |
" print(f\"Table {TABLE} created successfully.\")\n", | |
" break\n", | |
" except glue_client.exceptions.EntityNotFoundException:\n", | |
" print(f\"Table {TABLE} not yet created, waiting...\")\n", | |
" time.sleep(10) # Wait for 10 seconds before checking again\n", | |
"else:\n", | |
" print(f'Table {TABLE} already exist in Lake Formation.')\n", | |
"\n", | |
" " | |
] | |
}, | |
{ | |
"attachments": {}, | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Create a test dataframe and populate table with it" | |
] | |
}, | |
{ | |
"attachments": {}, | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Utility functions" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 7, | |
"metadata": {}, | |
"outputs": [], | |
"source": [ | |
"# get list of columns from table\n", | |
"def get_table_columns(table_name, database_name):\n", | |
" table = wr.catalog.table(database=database_name, table=table_name)\n", | |
" table_columns = table['Column Name'].tolist()\n", | |
" return table_columns\n", | |
"\n", | |
"def get_confidential_columns(table_name, database_name):\n", | |
" table = wr.catalog.table(database=database_name, table=table_name)\n", | |
" table_columns = table['Column Name'].tolist()\n", | |
" confidential_columns = [column for column in table_columns if column.startswith('confidential')] \n", | |
" return confidential_columns\n", | |
"\n", | |
"def get_nonconfidential_columns(table_name, database_name):\n", | |
" table = wr.catalog.table(database=database_name, table=table_name)\n", | |
" table_columns = table['Column Name'].tolist()\n", | |
" nonconfidential_columns = [column for column in table_columns if column.startswith('nonconfidential')]\n", | |
" return nonconfidential_columns\n", | |
"\n", | |
"columns = get_confidential_columns(TABLE, DATABASE)\n", | |
"confidential_columns = get_confidential_columns(TABLE, DATABASE)\n", | |
"nonconfidential_columns = get_nonconfidential_columns(TABLE, DATABASE)\n", | |
"# two more columns are reserved:\n", | |
"# 'cohort' - cohort name\n", | |
"# 'created_at' - timestamp when record was created\n" | |
] | |
}, | |
{ | |
"attachments": {}, | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Create a test dataframe" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 8, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"data": { | |
"text/html": [ | |
"<div>\n", | |
"<style scoped>\n", | |
" .dataframe tbody tr th:only-of-type {\n", | |
" vertical-align: middle;\n", | |
" }\n", | |
"\n", | |
" .dataframe tbody tr th {\n", | |
" vertical-align: top;\n", | |
" }\n", | |
"\n", | |
" .dataframe thead th {\n", | |
" text-align: right;\n", | |
" }\n", | |
"</style>\n", | |
"<table border=\"1\" class=\"dataframe\">\n", | |
" <thead>\n", | |
" <tr style=\"text-align: right;\">\n", | |
" <th></th>\n", | |
" <th>cohort</th>\n", | |
" <th>confidential1</th>\n", | |
" <th>confidential2</th>\n", | |
" <th>nonconfidential1</th>\n", | |
" <th>nonconfidential2</th>\n", | |
" <th>nonconfidential3</th>\n", | |
" <th>nonconfidential4</th>\n", | |
" <th>nonconfidential5</th>\n", | |
" <th>nonconfidential6</th>\n", | |
" <th>nonconfidential7</th>\n", | |
" <th>...</th>\n", | |
" <th>nonconfidential92</th>\n", | |
" <th>nonconfidential93</th>\n", | |
" <th>nonconfidential94</th>\n", | |
" <th>nonconfidential95</th>\n", | |
" <th>nonconfidential96</th>\n", | |
" <th>nonconfidential97</th>\n", | |
" <th>nonconfidential98</th>\n", | |
" <th>nonconfidential99</th>\n", | |
" <th>nonconfidential100</th>\n", | |
" <th>created_at</th>\n", | |
" </tr>\n", | |
" </thead>\n", | |
" <tbody>\n", | |
" <tr>\n", | |
" <th>0</th>\n", | |
" <td>US</td>\n", | |
" <td>confidential1_0</td>\n", | |
" <td>confidential2_0</td>\n", | |
" <td>nonconfidential1_0</td>\n", | |
" <td>nonconfidential2_0</td>\n", | |
" <td>nonconfidential3_0</td>\n", | |
" <td>nonconfidential4_0</td>\n", | |
" <td>nonconfidential5_0</td>\n", | |
" <td>nonconfidential6_0</td>\n", | |
" <td>nonconfidential7_0</td>\n", | |
" <td>...</td>\n", | |
" <td>nonconfidential92_0</td>\n", | |
" <td>nonconfidential93_0</td>\n", | |
" <td>nonconfidential94_0</td>\n", | |
" <td>nonconfidential95_0</td>\n", | |
" <td>nonconfidential96_0</td>\n", | |
" <td>nonconfidential97_0</td>\n", | |
" <td>nonconfidential98_0</td>\n", | |
" <td>nonconfidential99_0</td>\n", | |
" <td>nonconfidential100_0</td>\n", | |
" <td>2023-05-23 15:08:49.172</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>1</th>\n", | |
" <td>UK</td>\n", | |
" <td>confidential1_1</td>\n", | |
" <td>confidential2_1</td>\n", | |
" <td>nonconfidential1_1</td>\n", | |
" <td>nonconfidential2_1</td>\n", | |
" <td>nonconfidential3_1</td>\n", | |
" <td>nonconfidential4_1</td>\n", | |
" <td>nonconfidential5_1</td>\n", | |
" <td>nonconfidential6_1</td>\n", | |
" <td>nonconfidential7_1</td>\n", | |
" <td>...</td>\n", | |
" <td>nonconfidential92_1</td>\n", | |
" <td>nonconfidential93_1</td>\n", | |
" <td>nonconfidential94_1</td>\n", | |
" <td>nonconfidential95_1</td>\n", | |
" <td>nonconfidential96_1</td>\n", | |
" <td>nonconfidential97_1</td>\n", | |
" <td>nonconfidential98_1</td>\n", | |
" <td>nonconfidential99_1</td>\n", | |
" <td>nonconfidential100_1</td>\n", | |
" <td>2023-05-23 15:08:49.172</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>2</th>\n", | |
" <td>IL</td>\n", | |
" <td>confidential1_2</td>\n", | |
" <td>confidential2_2</td>\n", | |
" <td>nonconfidential1_2</td>\n", | |
" <td>nonconfidential2_2</td>\n", | |
" <td>nonconfidential3_2</td>\n", | |
" <td>nonconfidential4_2</td>\n", | |
" <td>nonconfidential5_2</td>\n", | |
" <td>nonconfidential6_2</td>\n", | |
" <td>nonconfidential7_2</td>\n", | |
" <td>...</td>\n", | |
" <td>nonconfidential92_2</td>\n", | |
" <td>nonconfidential93_2</td>\n", | |
" <td>nonconfidential94_2</td>\n", | |
" <td>nonconfidential95_2</td>\n", | |
" <td>nonconfidential96_2</td>\n", | |
" <td>nonconfidential97_2</td>\n", | |
" <td>nonconfidential98_2</td>\n", | |
" <td>nonconfidential99_2</td>\n", | |
" <td>nonconfidential100_2</td>\n", | |
" <td>2023-05-23 15:08:49.172</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>3</th>\n", | |
" <td>US</td>\n", | |
" <td>confidential1_3</td>\n", | |
" <td>confidential2_3</td>\n", | |
" <td>nonconfidential1_3</td>\n", | |
" <td>nonconfidential2_3</td>\n", | |
" <td>nonconfidential3_3</td>\n", | |
" <td>nonconfidential4_3</td>\n", | |
" <td>nonconfidential5_3</td>\n", | |
" <td>nonconfidential6_3</td>\n", | |
" <td>nonconfidential7_3</td>\n", | |
" <td>...</td>\n", | |
" <td>nonconfidential92_3</td>\n", | |
" <td>nonconfidential93_3</td>\n", | |
" <td>nonconfidential94_3</td>\n", | |
" <td>nonconfidential95_3</td>\n", | |
" <td>nonconfidential96_3</td>\n", | |
" <td>nonconfidential97_3</td>\n", | |
" <td>nonconfidential98_3</td>\n", | |
" <td>nonconfidential99_3</td>\n", | |
" <td>nonconfidential100_3</td>\n", | |
" <td>2023-05-23 15:08:49.172</td>\n", | |
" </tr>\n", | |
" <tr>\n", | |
" <th>4</th>\n", | |
" <td>UK</td>\n", | |
" <td>confidential1_4</td>\n", | |
" <td>confidential2_4</td>\n", | |
" <td>nonconfidential1_4</td>\n", | |
" <td>nonconfidential2_4</td>\n", | |
" <td>nonconfidential3_4</td>\n", | |
" <td>nonconfidential4_4</td>\n", | |
" <td>nonconfidential5_4</td>\n", | |
" <td>nonconfidential6_4</td>\n", | |
" <td>nonconfidential7_4</td>\n", | |
" <td>...</td>\n", | |
" <td>nonconfidential92_4</td>\n", | |
" <td>nonconfidential93_4</td>\n", | |
" <td>nonconfidential94_4</td>\n", | |
" <td>nonconfidential95_4</td>\n", | |
" <td>nonconfidential96_4</td>\n", | |
" <td>nonconfidential97_4</td>\n", | |
" <td>nonconfidential98_4</td>\n", | |
" <td>nonconfidential99_4</td>\n", | |
" <td>nonconfidential100_4</td>\n", | |
" <td>2023-05-23 15:08:49.172</td>\n", | |
" </tr>\n", | |
" </tbody>\n", | |
"</table>\n", | |
"<p>5 rows × 104 columns</p>\n", | |
"</div>" | |
], | |
"text/plain": [ | |
" cohort confidential1 confidential2 nonconfidential1 \\\n", | |
"0 US confidential1_0 confidential2_0 nonconfidential1_0 \n", | |
"1 UK confidential1_1 confidential2_1 nonconfidential1_1 \n", | |
"2 IL confidential1_2 confidential2_2 nonconfidential1_2 \n", | |
"3 US confidential1_3 confidential2_3 nonconfidential1_3 \n", | |
"4 UK confidential1_4 confidential2_4 nonconfidential1_4 \n", | |
"\n", | |
" nonconfidential2 nonconfidential3 nonconfidential4 \\\n", | |
"0 nonconfidential2_0 nonconfidential3_0 nonconfidential4_0 \n", | |
"1 nonconfidential2_1 nonconfidential3_1 nonconfidential4_1 \n", | |
"2 nonconfidential2_2 nonconfidential3_2 nonconfidential4_2 \n", | |
"3 nonconfidential2_3 nonconfidential3_3 nonconfidential4_3 \n", | |
"4 nonconfidential2_4 nonconfidential3_4 nonconfidential4_4 \n", | |
"\n", | |
" nonconfidential5 nonconfidential6 nonconfidential7 ... \\\n", | |
"0 nonconfidential5_0 nonconfidential6_0 nonconfidential7_0 ... \n", | |
"1 nonconfidential5_1 nonconfidential6_1 nonconfidential7_1 ... \n", | |
"2 nonconfidential5_2 nonconfidential6_2 nonconfidential7_2 ... \n", | |
"3 nonconfidential5_3 nonconfidential6_3 nonconfidential7_3 ... \n", | |
"4 nonconfidential5_4 nonconfidential6_4 nonconfidential7_4 ... \n", | |
"\n", | |
" nonconfidential92 nonconfidential93 nonconfidential94 \\\n", | |
"0 nonconfidential92_0 nonconfidential93_0 nonconfidential94_0 \n", | |
"1 nonconfidential92_1 nonconfidential93_1 nonconfidential94_1 \n", | |
"2 nonconfidential92_2 nonconfidential93_2 nonconfidential94_2 \n", | |
"3 nonconfidential92_3 nonconfidential93_3 nonconfidential94_3 \n", | |
"4 nonconfidential92_4 nonconfidential93_4 nonconfidential94_4 \n", | |
"\n", | |
" nonconfidential95 nonconfidential96 nonconfidential97 \\\n", | |
"0 nonconfidential95_0 nonconfidential96_0 nonconfidential97_0 \n", | |
"1 nonconfidential95_1 nonconfidential96_1 nonconfidential97_1 \n", | |
"2 nonconfidential95_2 nonconfidential96_2 nonconfidential97_2 \n", | |
"3 nonconfidential95_3 nonconfidential96_3 nonconfidential97_3 \n", | |
"4 nonconfidential95_4 nonconfidential96_4 nonconfidential97_4 \n", | |
"\n", | |
" nonconfidential98 nonconfidential99 nonconfidential100 \\\n", | |
"0 nonconfidential98_0 nonconfidential99_0 nonconfidential100_0 \n", | |
"1 nonconfidential98_1 nonconfidential99_1 nonconfidential100_1 \n", | |
"2 nonconfidential98_2 nonconfidential99_2 nonconfidential100_2 \n", | |
"3 nonconfidential98_3 nonconfidential99_3 nonconfidential100_3 \n", | |
"4 nonconfidential98_4 nonconfidential99_4 nonconfidential100_4 \n", | |
"\n", | |
" created_at \n", | |
"0 2023-05-23 15:08:49.172 \n", | |
"1 2023-05-23 15:08:49.172 \n", | |
"2 2023-05-23 15:08:49.172 \n", | |
"3 2023-05-23 15:08:49.172 \n", | |
"4 2023-05-23 15:08:49.172 \n", | |
"\n", | |
"[5 rows x 104 columns]" | |
] | |
}, | |
"execution_count": 8, | |
"metadata": {}, | |
"output_type": "execute_result" | |
} | |
], | |
"source": [ | |
"n_records = 1000\n", | |
"cohort_names = ['US','UK',\"IL\"]\n", | |
"\n", | |
"def create_record(index, cohort_names=cohort_names,\n", | |
" confidential_columns=confidential_columns,\n", | |
" nonconfidential_columns=nonconfidential_columns):\n", | |
" record = {}\n", | |
" record['cohort'] = cohort_names[index % len(cohort_names)]\n", | |
" for column in confidential_columns:\n", | |
" record[column] = f\"{column}_{index}\"\n", | |
" for column in nonconfidential_columns:\n", | |
" record[column] = f\"{column}_{index}\"\n", | |
" now= datetime.now()\n", | |
" now_microsecond_rounded = now.replace(microsecond=(now.microsecond // 1000) * 1000) # round to nearest microsecond\n", | |
" record['created_at'] = now_microsecond_rounded # need to be rounded to microseconds - otherwise it will not match LF precision in comparison \n", | |
" return record\n", | |
"\n", | |
"records = [create_record(i) for i in range(n_records)]\n", | |
"df = pd.DataFrame(records)\n", | |
"df.head()\n" | |
] | |
}, | |
{ | |
"attachments": {}, | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"### Store dataframe in the table " | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 9, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stderr", | |
"output_type": "stream", | |
"text": [ | |
"/System/Volumes/Data/anaconda3/envs/pheno39/lib/python3.9/site-packages/awswrangler/s3/_write_dataset.py:150: FutureWarning: In a future version of pandas, a length 1 tuple will be returned when iterating over a groupby with a grouper equal to a list of length 1. Don't supply a list with a single grouper to avoid this warning.\n", | |
" for keys, subgroup in df.groupby(by=partition_cols, observed=True):\n" | |
] | |
}, | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Table performance_test_table was created in database lakeformation_performance_test with 1000 records\n", | |
"CPU times: user 632 ms, sys: 38.6 ms, total: 671 ms\n", | |
"Wall time: 19.1 s\n" | |
] | |
} | |
], | |
"source": [ | |
"%%time\n", | |
"wr.s3.to_parquet(\n", | |
" df=df,\n", | |
" dataset=True,\n", | |
" database=DATABASE,\n", | |
" table=TABLE,\n", | |
" partition_cols=[\"cohort\"],\n", | |
" mode=\"overwrite_partitions\",\n", | |
" )\n", | |
"print(f\"Table {TABLE} was created in database {DATABASE} with {len(df.index)} records\") " | |
] | |
}, | |
{ | |
"attachments": {}, | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Read table as an ordinary S3 parquet object" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 10, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Original dataframe and dataframe read from lakeformation have the same content\n", | |
"CPU times: user 221 ms, sys: 24 ms, total: 245 ms\n", | |
"Wall time: 3.04 s\n" | |
] | |
} | |
], | |
"source": [ | |
"%%time\n", | |
"# read as Parquet dataset \n", | |
"s3_df = wr.s3.read_parquet(path=TABLE_LOCATION, dataset=True) \n", | |
"# compare with an original dataframe\n", | |
"df_diff = pd.concat([df, s3_df]).drop_duplicates(keep=False)\n", | |
"if df_diff.empty:\n", | |
" print(\"Original dataframe and dataframe read from lakeformation have the same content\")\n", | |
"else:\n", | |
" print(\"Original dataframe and dataframe read from lakeformation have NOT the same content\")\n", | |
" print(df_diff)\n" | |
] | |
}, | |
{ | |
"attachments": {}, | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Read table via Athena API" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 11, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Original dataframe and dataframe read from lakeformation have the same content\n", | |
"CPU times: user 1.47 s, sys: 134 ms, total: 1.61 s\n", | |
"Wall time: 56.7 s\n" | |
] | |
} | |
], | |
"source": [ | |
"%%time\n", | |
"# Read the data into a DataFrame\n", | |
"athena_df = wr.athena.read_sql_table(table=TABLE, database=DATABASE,ctas_approach=False)\n", | |
"# compare with an original dataframe\n", | |
"df_diff = pd.concat([df, athena_df]).drop_duplicates(keep=False)\n", | |
"if df_diff.empty:\n", | |
" print(\"Original dataframe and dataframe read from lakeformation have the same content\")\n", | |
"else:\n", | |
" print(\"Original dataframe and dataframe read from lakeformation have NOT the same content\")\n", | |
" print(df_diff)" | |
] | |
}, | |
{ | |
"attachments": {}, | |
"cell_type": "markdown", | |
"metadata": {}, | |
"source": [ | |
"# Read table via Lake Formation API" | |
] | |
}, | |
{ | |
"cell_type": "code", | |
"execution_count": 12, | |
"metadata": {}, | |
"outputs": [ | |
{ | |
"name": "stdout", | |
"output_type": "stream", | |
"text": [ | |
"Original dataframe and dataframe read from lakeformation have the same content\n", | |
"CPU times: user 693 ms, sys: 50.6 ms, total: 744 ms\n", | |
"Wall time: 50.8 s\n" | |
] | |
} | |
], | |
"source": [ | |
"%%time\n", | |
"# Read the data into a DataFrame\n", | |
"table_df = wr.lakeformation.read_sql_table(database=DATABASE, table=TABLE)\n", | |
"# compare with an original dataframe\n", | |
"df_diff = pd.concat([df, table_df]).drop_duplicates(keep=False)\n", | |
"if df_diff.empty:\n", | |
" print(\"Original dataframe and dataframe read from lakeformation have the same content\")\n", | |
"else:\n", | |
" print(\"Original dataframe and dataframe read from lakeformation have NOT the same content\")\n", | |
" print(df_diff)" | |
] | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "pheno39", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"file_extension": ".py", | |
"mimetype": "text/x-python", | |
"name": "python", | |
"nbconvert_exporter": "python", | |
"pygments_lexer": "ipython3", | |
"version": "3.9.15" | |
}, | |
"orig_nbformat": 4 | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 2 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment