Skip to content

Instantly share code, notes, and snippets.

@wesky93
Last active October 4, 2018 03:00
Show Gist options
  • Save wesky93/d48a6e92e5aaa66def254108b03e2c0b to your computer and use it in GitHub Desktop.
Save wesky93/d48a6e92e5aaa66def254108b03e2c0b to your computer and use it in GitHub Desktop.
aws lambda event parser
import base64
import json
import zlib
import boto3
s3 = boto3.resource('s3')
def get_value_type(dic):
"""
타입:값 형삭의 dyanmodb 필드에서 실제값을 추출
:param dic:
:return:
"""
return list(dic.keys())[0]
def get_value(dic):
return dic[get_value_type(dic)]
def get_map_value(data):
raw = get_value(data)
for k in raw:
raw[k] = get_value(raw[k])
return raw
class BaseParser:
def set_data(self, data):
[setattr(self, key, value) for key, value in zip(data.keys(), data.values())]
class DynamoStream:
def __init__(self, record):
self.raw = record
self.action = self.raw.get('eventName')
new_img = self.raw['dynamodb'].get('NewImage')
old_img = self.raw['dynamodb'].get('OldImage')
self.new_img = self.get_img(new_img) if new_img else None
self.old_img = self.get_img(old_img) if old_img else None
def get_img(self, data):
"""
{key:{type:value}} 형식을 {key: value} 형식으로 변경
:param data:
:return:
"""
info = {}
for key in data:
# todo: 추후 여러 타입을 지원하도록 구조 변경
value_type = get_value_type(data[key])
if value_type == 'L':
value = []
for v in get_value(data[key]):
if get_value_type(v) == 'M':
value.append(get_map_value(v))
else:
value.append(get_value(v))
elif value_type == 'M':
value = get_map_value(data[key])
else:
value = get_value(data[key])
info[key] = value
return info
# lambda event 파싱 클래스
class SnsEvent:
def __init__(self, record):
self.raw = record
self.EventSubscriptionArn = self.raw.get('EventSubscriptionArn')
self.data = self.raw['Sns']
self.TopicArn = self.data.get('TopicArn')
self.msg = json.loads(self.data['Message'])
self.get_msg_atrs()
def get_msg_atrs(self):
# 메시지 속성 가져오기
self.msg_atrs = self.data.get('MessageAttributes')
if self.msg_atrs:
for k in self.msg_atrs:
setattr(self, k, self.msg_atrs[k]['Value'])
# S3 파싱 클래스
class S3:
def __init__(self, record):
self.raw = record
self.bucket = self.raw['s3']['bucket']
self._object = self.raw['s3']['object']
self.object_key = self._object['key']
self.object_size = self._object['size']
self.bucket_arn = self.bucket['arn']
self.bucket_name = self.bucket['name']
self.region = self.raw['awsRegion']
self.object = s3.Object(self.bucket_name, self.object_key)
class Sqs:
def __init__(self, record):
self.raw = record
self.messageId = self.raw['messageId']
self.body = self.raw['body']
self.attributes = self.raw['attributes']
self.md5OfBody = self.raw['md5OfBody']
self.region = self.raw['awsRegion']
@property
def json(self):
return json.loads(self.body)
class Kinesis(BaseParser):
def __init__(self, record):
self.raw = record
kinesis = record.pop('kinesis')
self.set_data(record)
self.set_data(kinesis)
records_event_parser = {
"sns": {
'name': 'sns',
'parser': SnsEvent,
},
"dynamodb": {
'name': 'dynamodb',
'parser': DynamoStream,
},
"s3": {
'name': 's3',
'parser': S3,
},
"kinesis": {
'name': 'kinesis',
'parser': Kinesis,
},
"sqs": {
"name": "sqs",
"parser": Sqs,
},
}
class EVENT_PARSER(BaseParser):
def __init__(self, event):
self.raw_event = event
self.event_type = ''
self.records = []
self.apig = False
self.scheduled_event = False
self.check_event()
self.support_event_type = ['apig', 's3', 'logs', 'kinesis', 'sns', 'sqs', 'dynamodb']
for event_type in self.support_event_type:
setattr(self, event_type, False if self.event_type != event_type else True)
def check_event(self):
"""
요청받은 이벤트의 종류 분리
:return:
"""
base_parser = {
"Records": self.check_records_event,
"detail-type": self.get_scheduled_event,
"awslogs": self.get_logs_event,
"headers": self.get_apig_event,
}
for key in base_parser.keys():
if key in self.raw_event.keys():
base_parser[key]()
continue
def check_records_event(self):
# detect service
records = self.raw_event['Records']
record = records[0]
if record.get("eventSource"):
event = [event for event in records_event_parser.keys() if event in record["eventSource"]][0]
else:
event = [event for event in records_event_parser.keys() if event in record["EventSource"]][0]
self.event_type = records_event_parser[event]['name']
parser = records_event_parser[event]['parser']
self.records += [parser(r) for r in records]
def get_scheduled_event(self):
self.event_type = "scheduled_event"
self.set_data(self.raw_event)
def get_logs_event(self):
self.event_type = 'awslogs'
self.data = zlib.decompress(base64.b64decode(self.raw_event['awslogs']['data']),
16 + zlib.MAX_WBITS).decode('utf-8')
@property
def eval_data(self):
"""
for logs dict to str data
:return:
"""
return json.loads(self.data)
@property
def json_data(self):
"""
for logs dict to json data
:return:
"""
return json.loads(self.data)
def get_apig_event(self):
self.event_type = 'apig'
self.headers = self.raw_event['headers']
self.body = self.raw_event['body']
@wesky93
Copy link
Author

wesky93 commented Jun 29, 2018

좀더 정형화된 구조이면서 추후 확장이 용이하도록 변경하였습니다.
self.support_event_type 에 있는 이벤트 타입에 한해서
handler에서
if event_parser.event_type == "logs": pass
이런 방식과
if event_parser.logs: pass
이런 방식 둘다 사용가능 합니다.

또한 awslogs data가 dict을 단순 str한 경우와 json화 한 경우 바로 파싱할수 있는 property를 추가(eval_data,json_data)를 추가했습니다

@wesky93
Copy link
Author

wesky93 commented Aug 6, 2018

sqs 파싱 추가

@wesky93
Copy link
Author

wesky93 commented Oct 4, 2018

ast파싱을 json파싱으로 변경

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment