Skip to content

Instantly share code, notes, and snippets.

@uc-compass-bot
Created November 13, 2019 21:15
Show Gist options
  • Save uc-compass-bot/91d6e6d13500bc5fc5afffe66122dc2a to your computer and use it in GitHub Desktop.
Save uc-compass-bot/91d6e6d13500bc5fc5afffe66122dc2a to your computer and use it in GitHub Desktop.
def lambda_handler(event, context, s3_client=None,
athena_client=None):
start_time = time.time()
records = event["Records"]
logger.info(f"Received event with {len(records)} records")
if s3_client is None:
s3_client = boto3.client("s3")
if athena_client is None:
athena_client = boto3.client("athena")
for record in records:
bucket = record["s3"]["bucket"]["name"]
key = record["s3"]["object"]["key"]
if not key.startswith(SOURCE_PREFIX):
logger.info(f"Key doesn't start with {SOURCE_PREFIX!r}, skipping: {key}")
continue
filename = key.split("/")[-1]
match = FILENAME_RE.match(filename)
if not match:
logger.warning(f"Filename not a Cloudfront log file, skipping: {filename}")
continue
year, month, day, hour = match.groups()
dest_bucket = DEST_BUCKET or bucket
partition = f"year={year}/month={month}/day={day}"
if INCLUDE_HOUR:
partition += f"/hour={hour}"
dest_key = f"{TARGET_PREFIX}{partition}/{filename}"
logger.info(f"Copying s3://{bucket}/{key} -> s3://{dest_bucket}/{dest_key}")
s3_client.copy_object(
CopySource={"Bucket": bucket, "Key": key},
Bucket=dest_bucket,
Key=dest_key
)
if DELETE_SOURCE:
logger.info(f"Deleting s3://{bucket}/{key}")
s3_client.delete_object(Bucket=bucket, Key=key)
if ADD_PARTITIONS:
logger.info(f"Adding Athena partition {partition}")
query_hour = ""
if INCLUDE_HOUR:
query_hour = f", hour = '{hour}'"
query = f"""
ALTER TABLE {ATHENA_TABLE}
ADD IF NOT EXISTS PARTITION
(year = '{year}', month = '{month}', day = '{day}'{query_hour});
"""
output_location = f"s3://{dest_bucket}/{ATHENA_PREFIX}"
response = athena_client.start_query_execution(
QueryString=query,
ResultConfiguration={"OutputLocation":
output_location},
)
query_id = response["QueryExecutionId"]
state = wait_athena_query(athena_client, query_id)
if state != "SUCCEEDED":
raise Exception(f"Athena query {query_id} error {state}")
elapsed = time.time() - start_time
logger.info(f"Finished processing event in {elapsed:.3f} seconds")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment