Skip to content

Instantly share code, notes, and snippets.

@bayupw
Created April 7, 2023 20:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bayupw/33784908af56e4a495ecca56ee553051 to your computer and use it in GitHub Desktop.
Save bayupw/33784908af56e4a495ecca56ee553051 to your computer and use it in GitHub Desktop.
Lambda function python postgre schema load
# import cfnresponse
import json
import psycopg2
import os
def lambda_handler(event, context):
# Get the properties from the event
# properties = event['ResourceProperties']
# resource_id = event['LogicalResourceId']
# Create a connection to the database
conn = psycopg2.connect(os.getenv('CONNECTION_URI'))
try:
with conn.cursor() as cur:
cur.execute('CREATE EXTENSION IF NOT EXISTS "uuid-ossp";')
cur.execute('DROP TABLE IF EXISTS public.users;')
cur.execute('DROP TABLE IF EXISTS public.activities;')
cur.execute("""
CREATE TABLE public.users (
uuid UUID DEFAULT uuid_generate_v4() PRIMARY KEY,
display_name text NOT NULL,
handle text NOT NULL,
email text NOT NULL,
cognito_user_id text NOT NULL,
created_at TIMESTAMP default current_timestamp NOT NULL
);
""")
cur.execute("""
CREATE TABLE public.activities (
uuid UUID DEFAULT uuid_generate_v4() PRIMARY KEY,
user_uuid UUID NOT NULL,
message text NOT NULL,
replies_count integer DEFAULT 0,
reposts_count integer DEFAULT 0,
likes_count integer DEFAULT 0,
reply_to_activity_uuid integer,
expires_at TIMESTAMP,
created_at TIMESTAMP default current_timestamp NOT NULL
);
""")
# Commit the transaction
conn.commit()
# Send the success response to CloudFormation
# response_data = {'message': 'Table created successfully'}
# cfnresponse.send(event, context, cfnresponse.SUCCESS, response_data)
except (Exception, psycopg2.DatabaseError) as error:
print(error)
# Roll back the transaction
conn.rollback()
# Send the failure response to CloudFormation
# response_data = {'error': str(error)}
# cfnresponse.send(event, context, cfnresponse.FAILED, response_data)
finally:
if conn is not None:
cur.close()
conn.close()
print('Database connection closed.')
return event
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment