Skip to content

Instantly share code, notes, and snippets.

@colemanja91
Created April 11, 2018 22:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save colemanja91/c375239dcdcbb4867166fcb05f0aac42 to your computer and use it in GitHub Desktop.
Save colemanja91/c375239dcdcbb4867166fcb05f0aac42 to your computer and use it in GitHub Desktop.
Sample Athena S3 results tagging via AWS Lambda
"""
Lambda invocation to set security tags on Athena output; triggered by S3 Object
events
"""
import logging
import boto3
LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)
ATHENA = boto3.client('athena')
S3CLIENT = boto3.client('s3')
DBNAME = 'restricteddb'
def check_query_context(query_id):
"""
Check if query falls under the protected DB
:param string query_id: Athena QueryExecutionId
:return bool: True if protected DB
"""
LOGGER.info("Checking if %s is a protected data set query" % query_id)
try:
query = ATHENA.get_query_execution(QueryExecutionId=query_id)
except ATHENA.exceptions.InvalidRequestException:
# Protect against invalid query Ids
return False
if query['QueryExecution']['QueryExecutionContext']['Database'] == DBNAME:
LOGGER.info("%s is a protected data set query" % query_id)
return True
LOGGER.info("%s is not a protected data set query" % query_id)
return False
def check_action(obj_name):
"""
Check if action is required on event
:param dict event: Lambda invocation event S3 PutObject
:return bool: True if processing action is required
"""
# Exclude metadata files from processing
if obj_name.endswith('.metadata'):
LOGGER.info("%s is a metadata file" % obj_name)
return False
query_id = obj_name.split('.')[0]
query_id = query_id.split('/')[-1]
return check_query_context(query_id)
def set_obj_tags(bucket, obj_name):
"""
Set object tags
"""
S3CLIENT.put_object_tagging(
Bucket=bucket,
Key=obj_name,
Tagging={
'TagSet': [
{
'Key': 'restricted_data',
'Value': 'True'
}
]
}
)
def obj_handler(bucket, obj_name):
"""
Generalized handling (allows loop over multiple objects from the PUT operation)
"""
action = check_action(obj_name)
if action is True:
LOGGER.info('Setting tags for %s' % obj_name)
set_obj_tags(bucket, obj_name)
else:
LOGGER.info('No action required for %s' % obj_name)
def lambda_handler(event, context):
"""
Lambda invocation
"""
for record in event['Records']:
obj_name = record['s3']['object']['key']
bucket = record['s3']['bucket']['name']
obj_handler(bucket, obj_name)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment