Last active
October 4, 2018 03:00
-
-
Save wesky93/d48a6e92e5aaa66def254108b03e2c0b to your computer and use it in GitHub Desktop.
aws lambda event parser
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import base64 | |
import json | |
import zlib | |
import boto3 | |
s3 = boto3.resource('s3') | |
def get_value_type(dic): | |
""" | |
타입:값 형삭의 dyanmodb 필드에서 실제값을 추출 | |
:param dic: | |
:return: | |
""" | |
return list(dic.keys())[0] | |
def get_value(dic): | |
return dic[get_value_type(dic)] | |
def get_map_value(data): | |
raw = get_value(data) | |
for k in raw: | |
raw[k] = get_value(raw[k]) | |
return raw | |
class BaseParser: | |
def set_data(self, data): | |
[setattr(self, key, value) for key, value in zip(data.keys(), data.values())] | |
class DynamoStream: | |
def __init__(self, record): | |
self.raw = record | |
self.action = self.raw.get('eventName') | |
new_img = self.raw['dynamodb'].get('NewImage') | |
old_img = self.raw['dynamodb'].get('OldImage') | |
self.new_img = self.get_img(new_img) if new_img else None | |
self.old_img = self.get_img(old_img) if old_img else None | |
def get_img(self, data): | |
""" | |
{key:{type:value}} 형식을 {key: value} 형식으로 변경 | |
:param data: | |
:return: | |
""" | |
info = {} | |
for key in data: | |
# todo: 추후 여러 타입을 지원하도록 구조 변경 | |
value_type = get_value_type(data[key]) | |
if value_type == 'L': | |
value = [] | |
for v in get_value(data[key]): | |
if get_value_type(v) == 'M': | |
value.append(get_map_value(v)) | |
else: | |
value.append(get_value(v)) | |
elif value_type == 'M': | |
value = get_map_value(data[key]) | |
else: | |
value = get_value(data[key]) | |
info[key] = value | |
return info | |
# lambda event 파싱 클래스 | |
class SnsEvent: | |
def __init__(self, record): | |
self.raw = record | |
self.EventSubscriptionArn = self.raw.get('EventSubscriptionArn') | |
self.data = self.raw['Sns'] | |
self.TopicArn = self.data.get('TopicArn') | |
self.msg = json.loads(self.data['Message']) | |
self.get_msg_atrs() | |
def get_msg_atrs(self): | |
# 메시지 속성 가져오기 | |
self.msg_atrs = self.data.get('MessageAttributes') | |
if self.msg_atrs: | |
for k in self.msg_atrs: | |
setattr(self, k, self.msg_atrs[k]['Value']) | |
# S3 파싱 클래스 | |
class S3: | |
def __init__(self, record): | |
self.raw = record | |
self.bucket = self.raw['s3']['bucket'] | |
self._object = self.raw['s3']['object'] | |
self.object_key = self._object['key'] | |
self.object_size = self._object['size'] | |
self.bucket_arn = self.bucket['arn'] | |
self.bucket_name = self.bucket['name'] | |
self.region = self.raw['awsRegion'] | |
self.object = s3.Object(self.bucket_name, self.object_key) | |
class Sqs: | |
def __init__(self, record): | |
self.raw = record | |
self.messageId = self.raw['messageId'] | |
self.body = self.raw['body'] | |
self.attributes = self.raw['attributes'] | |
self.md5OfBody = self.raw['md5OfBody'] | |
self.region = self.raw['awsRegion'] | |
@property | |
def json(self): | |
return json.loads(self.body) | |
class Kinesis(BaseParser): | |
def __init__(self, record): | |
self.raw = record | |
kinesis = record.pop('kinesis') | |
self.set_data(record) | |
self.set_data(kinesis) | |
records_event_parser = { | |
"sns": { | |
'name': 'sns', | |
'parser': SnsEvent, | |
}, | |
"dynamodb": { | |
'name': 'dynamodb', | |
'parser': DynamoStream, | |
}, | |
"s3": { | |
'name': 's3', | |
'parser': S3, | |
}, | |
"kinesis": { | |
'name': 'kinesis', | |
'parser': Kinesis, | |
}, | |
"sqs": { | |
"name": "sqs", | |
"parser": Sqs, | |
}, | |
} | |
class EVENT_PARSER(BaseParser): | |
def __init__(self, event): | |
self.raw_event = event | |
self.event_type = '' | |
self.records = [] | |
self.apig = False | |
self.scheduled_event = False | |
self.check_event() | |
self.support_event_type = ['apig', 's3', 'logs', 'kinesis', 'sns', 'sqs', 'dynamodb'] | |
for event_type in self.support_event_type: | |
setattr(self, event_type, False if self.event_type != event_type else True) | |
def check_event(self): | |
""" | |
요청받은 이벤트의 종류 분리 | |
:return: | |
""" | |
base_parser = { | |
"Records": self.check_records_event, | |
"detail-type": self.get_scheduled_event, | |
"awslogs": self.get_logs_event, | |
"headers": self.get_apig_event, | |
} | |
for key in base_parser.keys(): | |
if key in self.raw_event.keys(): | |
base_parser[key]() | |
continue | |
def check_records_event(self): | |
# detect service | |
records = self.raw_event['Records'] | |
record = records[0] | |
if record.get("eventSource"): | |
event = [event for event in records_event_parser.keys() if event in record["eventSource"]][0] | |
else: | |
event = [event for event in records_event_parser.keys() if event in record["EventSource"]][0] | |
self.event_type = records_event_parser[event]['name'] | |
parser = records_event_parser[event]['parser'] | |
self.records += [parser(r) for r in records] | |
def get_scheduled_event(self): | |
self.event_type = "scheduled_event" | |
self.set_data(self.raw_event) | |
def get_logs_event(self): | |
self.event_type = 'awslogs' | |
self.data = zlib.decompress(base64.b64decode(self.raw_event['awslogs']['data']), | |
16 + zlib.MAX_WBITS).decode('utf-8') | |
@property | |
def eval_data(self): | |
""" | |
for logs dict to str data | |
:return: | |
""" | |
return json.loads(self.data) | |
@property | |
def json_data(self): | |
""" | |
for logs dict to json data | |
:return: | |
""" | |
return json.loads(self.data) | |
def get_apig_event(self): | |
self.event_type = 'apig' | |
self.headers = self.raw_event['headers'] | |
self.body = self.raw_event['body'] |
sqs 파싱 추가
ast파싱을 json파싱으로 변경
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
좀더 정형화된 구조이면서 추후 확장이 용이하도록 변경하였습니다.
self.support_event_type 에 있는 이벤트 타입에 한해서
handler에서
if event_parser.event_type == "logs": pass
이런 방식과
if event_parser.logs: pass
이런 방식 둘다 사용가능 합니다.
또한 awslogs data가 dict을 단순 str한 경우와 json화 한 경우 바로 파싱할수 있는 property를 추가(eval_data,json_data)를 추가했습니다