Skip to content

Instantly share code, notes, and snippets.

@dgadiraju
Last active August 7, 2022 21:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dgadiraju/ffc99ad5e47648106dcda47000df980a to your computer and use it in GitHub Desktop.
Save dgadiraju/ffc99ad5e47648106dcda47000df980a to your computer and use it in GitHub Desktop.
import pandas as pd
import json
df = pd.read_csv(
's3://airetail/bronze/sfleads/addresses/Addresses.csv'
)
df.address = df['address'].apply(json.loads)
df.phone_numbers = df['phone_numbers']. \
apply(lambda pn: json.loads(pn) if not pd.isnull(pn) else None)
df_normalized = pd.json_normalize(df.to_dict(orient='records'))
df_normalized.columns = [
col.split('.')[1] if len(col.split('.')) == 2 else col for col in df_normalized.columns
]
df_normalized.to_csv(
's3://airetail/gold/sfleads/addresses/Addresses.csv',
index=None
)
import json
import os
import boto3
import pandas as pd
def lambda_handler(event, context):
bucket_name = os.environ.get('BUCKET_NAME')
s3_client = boto3.client('s3')
df = pd.read_csv(
f's3://{bucket_name}/bronze/addresses/Addresses.csv'
)
df.address = df['address'].apply(json.loads)
df.phone_numbers = df['phone_numbers']. \
apply(lambda pn: json.loads(pn) if not pd.isnull(pn) else None)
df_exploded = df.explode('phone_numbers').rename(columns={'phone_numbers': 'phone_number'})
df_normalized = pd.json_normalize(df_exploded.to_dict(orient='records'))
df_normalized.columns = [
col.split('.')[1] if len(col.split('.')) == 2 else col for col in df_normalized.columns
]
df_normalized.to_csv(
f's3://{bucket_name}/gold/addresses/Addresses.csv',
index=None
)
return {
'statusCode': 200,
'statusMessage': 'Successfully Transformed the data'
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment