Skip to content

Instantly share code, notes, and snippets.

@slotrans
Last active October 1, 2022 18:59
Show Gist options
  • Save slotrans/cf065be96f4c5563237c5dbef282d982 to your computer and use it in GitHub Desktop.
Save slotrans/cf065be96f4c5563237c5dbef282d982 to your computer and use it in GitHub Desktop.
lambda function to track data lake inventory
# If you have a data lake, you will often want to be able to ask questions about what's in it, and prefix-based listings of
# objects, as provided by S3 and all S3-alikes, tightly constrain your ability to do so. Using a simple Lambda function like
# this (which can be adapted to the FaaS platform of your choice) together with an RDBMS gives you a much more flexible way of
# asking meta questions about what's in your lake.
# Relevant table schema, adjust names as you like...
#
# create table lake.inventory
# (
# inventory_id bigserial primary key
# , created timestamptz not null default now()
# , bucket varchar(64) not null
# , key text not null
# , size bigint not null
# , event_time timestamptz not null
# );
#
# create unique index uidx_inventory_bucketkey on lake.inventory(bucket, key) ;
import boto3
import psycopg2
from urllib.parse import unquote
INSERT_SQL = """
insert into lake.inventory
( bucket
, key
, size
, event_time
)
values
( %(bucket)s
, %(key)s
, %(size)s
, cast(%(event_time)s as timestamptz)
)
on conflict (bucket, key)
do update set size = excluded.size
, event_time = excluded.event_time
""".strip()
DELETE_SQL = "delete from lake.inventory where ( bucket, key ) = ( %(bucket)s, %(key)s )"
# Could pull these from env vars of course, but we treated them as Well Known constants that didn't need to be configurable.
# You may be tempted to put ALL of these in the secret. Don't! Hostnames and usernames are not secrets!
DBHOST = 'your.db.host.name.here'
DBUSER = 'dont_connect_as_a_superuser'
DBCONN = None
DBPW = None
def get_db_password():
global DBPW
if DBPW is not None:
return DBPW
else:
client = boto3.client('secretsmanager')
response = client.get_secret_value(SecretId='YourSecretKeyNameHere')
DBPW = response['SecretString']
return DBPW
def get_connection():
global DBCONN
if DBCONN is not None:
return DBCONN
else:
DBCONN = psycopg2.connect(
host=DBHOST,
database='your-database',
user=DBUSER,
password=get_db_password()
)
DBCONN.autocommit = True
return DBCONN
# set the Lambda entry point to this function
def handler(event, context):
for r in event['Records']:
if r['eventName'].startswith('ObjectCreated'):
handle_put(r)
elif r['eventName'].startswith('ObjectRemoved'):
handle_delete(r)
return True
def handle_put(record):
connection = get_connection()
with connection.cursor() as cur:
bucket = record['s3']['bucket']['name']
key = unquote(record['s3']['object']['key'])
size = record['s3']['object']['size']
event_time = record['eventTime']
cur.execute(INSERT_SQL, dict(bucket=bucket, key=key, size=size, event_time=event_time))
def handle_delete(record):
connection = get_connection()
with connection.cursor() as cur:
bucket = record['s3']['bucket']['name']
key = unquote(record['s3']['object']['key'])
cur.execute(DELETE_SQL, dict(bucket=bucket, key=key))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment