|
#!/usr/bin/env python |
|
import argparse |
|
import boto3 |
|
import fnmatch |
|
import json |
|
import logging |
|
import re |
|
|
|
from collections import OrderedDict |
|
from datetime import datetime |
|
from io import open |
|
|
|
LOGGER_NAME = None |
|
|
|
|
|
class Config(object): |
|
mapping_list = None |
|
mapping_list_bucket = 'sequoia-short-lived' |
|
mapping_list_key = 'mapping.json' |
|
profile = None |
|
|
|
|
|
def log(): |
|
return logging.getLogger(LOGGER_NAME) |
|
|
|
|
|
def config_logger(): |
|
logger = log() |
|
if not logger.handlers: |
|
logger.addHandler(logging.StreamHandler()) |
|
logger.setLevel(logging.INFO) |
|
|
|
|
|
def read_file_from_s3(bucket, key): |
|
session = boto3.session.Session(profile_name=Config.profile) |
|
s3 = session.resource('s3') |
|
obj = s3.Object(bucket, key) |
|
file_contents = obj.get()['Body'].read().decode('utf-8') |
|
return json.loads(file_contents, object_pairs_hook=OrderedDict) |
|
|
|
|
|
def read_local_file(mapping_file_name): |
|
f = open(mapping_file_name, encoding='utf-8') |
|
mapping_file_name_text = f.read() # unicode, not bytes |
|
return json.loads(mapping_file_name_text, object_pairs_hook=OrderedDict) |
|
|
|
|
|
def config_mapping_list_from_s3(): |
|
Config.mapping_list = read_file_from_s3(Config.mapping_list_bucket, |
|
Config.mapping_list_key) |
|
|
|
|
|
def json_serial(obj): |
|
"""JSON serializer for objects not serializable by default json code""" |
|
|
|
if isinstance(obj, datetime): |
|
serial = obj.isoformat() |
|
return serial |
|
raise TypeError("Type not serializable") |
|
|
|
|
|
class S3URL: |
|
"""Simple wrapper for S3 URL. |
|
This class parses a S3 URL and provides accessors to each component. |
|
""" |
|
|
|
S3URL_PATTERN = re.compile(r'(s3[n]?)://([^/]+)[/]?(.*)') |
|
PATH_SEP = "/" |
|
|
|
def __init__(self, bucket=None, path=None, uri=None): |
|
"""Initialization, parse S3 URL""" |
|
if uri: |
|
try: |
|
self.proto, self.bucket, self.path = S3URL.S3URL_PATTERN.match(uri).groups() |
|
self.proto = self.proto.rstrip("n") # normalize s3n => s3 |
|
except Exception: |
|
raise RuntimeError('Invalid S3 URI: %s' % uri) |
|
else: |
|
self.proto = 's3' # normalize s3n => s3 |
|
self.bucket, self.path = bucket, path |
|
self.uri = S3URL.combine(self.proto, self.bucket, self.path) |
|
|
|
def __str__(self): |
|
"""Return the original S3 URL""" |
|
return S3URL.combine(self.proto, self.bucket, self.path) |
|
|
|
@staticmethod |
|
def combine(proto, bucket, path): |
|
"""Combine each component and generate a S3 url string, |
|
no path normalization here. |
|
The path should not start with slash. |
|
""" |
|
return '%s://%s/%s' % (proto, bucket, path) |
|
|
|
@staticmethod |
|
def is_valid(uri): |
|
"""Check if given uri is a valid S3 URL""" |
|
return S3URL.S3URL_PATTERN.match(uri) is not None |
|
|
|
|
|
def filter_key_collisions(list_of_dicts, key): |
|
merged = OrderedDict({}) |
|
for item in list_of_dicts: |
|
merged.setdefault(item[key], OrderedDict({})).update(item) |
|
return merged.values() |
|
|
|
|
|
def update_object_tags_by_mapping(bucket, s3_key): |
|
log().info('Attempting to Tag s3://%s/%s', bucket, s3_key) |
|
session = boto3.session.Session(profile_name=Config.profile) |
|
client = session.client("s3") |
|
tag_set_keys = ["Key", "Value"] |
|
|
|
current_tags = client.get_object_tagging(Bucket=bucket, Key=s3_key).get('TagSet', []) |
|
mapped_tags = [{key: mapping[key] for key in tag_set_keys} |
|
for mapping in Config.mapping_list |
|
if fnmatch.fnmatch(s3_key, mapping["Filter"])] |
|
if mapped_tags and any(li not in current_tags for li in mapped_tags): |
|
to_be_put_tag_set = filter_key_collisions(current_tags + mapped_tags, 'Key') |
|
response = client.put_object_tagging(Bucket=bucket, |
|
Key=s3_key, |
|
Tagging={'TagSet': to_be_put_tag_set}) |
|
json_added_tags = json.dumps(mapped_tags, default=json_serial) |
|
if response.get("ResponseMetadata", {}).get("HTTPStatusCode", 418) == 200: |
|
log().error('Successfully tagged s3://%s/%s with tags %s', |
|
bucket, |
|
s3_key, |
|
json_added_tags) |
|
else: |
|
json_response = json.dumps(response, default=json_serial) |
|
log().error('Error Attempting to tag s3://%s/%s with tags %s with response %s', bucket, s3_key, |
|
json_added_tags, json_response) |
|
|
|
|
|
def tag_s3_objects(uris): |
|
for s3_url in uris: |
|
if s3_url.is_valid: |
|
update_object_tags_by_mapping(s3_url.bucket, s3_url.path) |
|
else: |
|
log().info('Invalid uri %s', s3_url) |
|
|
|
|
|
def lambda_handler(event, context): |
|
config_logger() |
|
config_mapping_list_from_s3() |
|
uris = [S3URL(ev['s3']['bucket']['name'], ev['s3']['object']['key']) for ev in event['Records']] |
|
if Config.mapping_list: |
|
tag_s3_objects(uris) |
|
else: |
|
log().info('There is no mapping_list configured') |
|
log().info('Remaining time at exit: %d ms', context.get_remaining_time_in_millis()) |
|
|
|
|
|
def process_args(source=None): |
|
description = 'Handle S3 events by tagging based on mapping file' |
|
parser = argparse.ArgumentParser(description=description) |
|
parser.add_argument( |
|
'-p', '--profile', |
|
dest='profile', |
|
action='store', |
|
type=str, |
|
default=Config.profile, |
|
help='AWS profile') |
|
parser.add_argument( |
|
'-f', |
|
'--file', |
|
dest='mapping_local_file', |
|
action='store', |
|
type=str, |
|
default=None, |
|
help='Local mapping file or None') |
|
parser.add_argument('input', type=str, nargs='+', help='URI to process') |
|
args = parser.parse_args(args=source) |
|
Config.profile = args.profile |
|
if args.mapping_local_file: |
|
Config.mapping_list = read_local_file(args.mapping_local_file) |
|
else: |
|
config_mapping_list_from_s3() |
|
return args |
|
|
|
|
|
def main(): |
|
config_logger() |
|
s3urls = [S3URL(uri=s3url) for s3url in process_args().input] |
|
tag_s3_objects(s3urls) |
|
|
|
|
|
if __name__ == '__main__': |
|
main() |