Last active
April 21, 2021 08:41
-
-
Save shreyasms17/6fa357a1e3aced0be2a0970cfa6a4415 to your computer and use it in GitHub Desktop.
Lambda forwarder
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import boto3 | |
import traceback | |
import json | |
import os | |
import uuid | |
kinesis_client = boto3.client('kinesis') | |
config_file_path = 'sqs_to_kinesis_mapping.json' | |
dlq_stream_name = 'DLQStream' | |
def get_sqs_to_kinesis_mapping(): | |
''' | |
Description: | |
This function gets the mapping between the SQS queue and the respective Kinesis Data Stream | |
from sqs_to_kinesis_mapping json file. | |
:return sqs_to_kinesis_mapping: [type: dict] contains a 1-1 mapping between SQS queue arn to Kinesis Data Stream | |
''' | |
with open(config_file_path) as json_file: | |
sqs_to_kinesis_mapping = json.load(json_file) | |
return sqs_to_kinesis_mapping | |
def extract_data(record): | |
''' | |
Description: | |
This function performs the data extraction from the record dictionary | |
:param record: [type: dict] corresponds to a message from a list of messages read by lambda | |
:return data: [type: str] extracted data | |
''' | |
body = json.loads(record['body']) | |
data = body['Message'] | |
return data | |
def add_record(record, kinesis_records_all): | |
''' | |
Description: | |
This function extracts the required data from the record dictionary and adds it to the list of records | |
to be pushed to the respective mapped Kinesis Data Stream based on the eventSourceARN. | |
If data extraction throws exception then the whole record dictionary is dumped as a string to | |
the "non_conformed" key in kinesis_records_all; eventually to be dumped in DLQStream. | |
:param record: [type: dict] corresponds to a message from a list of messages read by lambda | |
:param kinesis_records_all: [type: dict] contains sqs queue arns as keys mapped to a dict containing list of records | |
to be pushed and the name of the destination Kinesis Data Stream | |
:return kinesis_records_all: [type: dict] updated dict kinesis_records_all | |
''' | |
#creating record_dict dictionary | |
record_dict = {} | |
try: | |
#source arn | |
sqs_source_arn = record['eventSourceARN'] | |
#Retrieving required data from record for 'Data' key. Value for 'Data' key can be as per your requirement. | |
data = extract_data(record) | |
record_dict['Data'] = data | |
record_dict['PartitionKey'] = str(uuid.uuid1()) | |
#get sqs name | |
sqs_name = sqs_source_arn.split(":")[-1] | |
#appending to respective sqs_key in kinesis_records_all dictionary | |
kinesis_records_all[sqs_source_arn]['Records'].append(record_dict) | |
#get eventid | |
eventId = json.loads(data)["eventId"] | |
print(f"Successfully processed eventId : {eventId} from SQS : {sqs_name}") | |
except Exception as e: | |
# dumping the whole record as is to non_conformed list | |
record_dict['Data'] = json.dumps(record) | |
record_dict['PartitionKey'] = str(uuid.uuid1()) | |
kinesis_records_all['non_conformed']['Records'].append(record_dict) | |
# log exceptions | |
message = '\n'.join([str(e), str(traceback.print_exc())]) | |
print(message) | |
finally: | |
return kinesis_records_all | |
def push_to_kinesis(source, source_dict): | |
''' | |
Description: | |
This function pushes the list of extracted records from a queue / non_conformed records to a particular | |
Kinesis Data Stream | |
:param source: [type: str] SQS queue ARN | |
:param source_dict: [type: dict] contains a list of records and the stream name where the records are to be pushed to | |
''' | |
if source_dict['Records']: | |
response = kinesis_client.put_records(Records = source_dict['Records'], StreamName = source_dict['StreamName']) | |
records_pushed_message = '' | |
if response["ResponseMetadata"]["HTTPStatusCode"] == 200: | |
pushed_count = ' '.join(['Pushed', str(len(source_dict['Records'])), 'records']) | |
from_details = ' '.join(['from', source]) | |
to_details = ' '.join(['to', source_dict['StreamName'], 'stream']) | |
records_pushed_message = ' '.join([pushed_count, to_details]) if source=='non_conformed' else ' '.join([pushed_count, from_details, to_details]) | |
message = ' '.join(['\nResponse received from', source_dict['StreamName'], 'with HTTPStatusCode', str(response["ResponseMetadata"]["HTTPStatusCode"]), ".", records_pushed_message]) | |
print(message) | |
def lambda_handler(event, context): | |
kinesis_records_all = {} | |
sqs_to_kinesis_mapping = get_sqs_to_kinesis_mapping() | |
for sqs_key in sqs_to_kinesis_mapping.keys(): | |
kinesis_records_all[sqs_key] = { 'Records': [], 'StreamName' : sqs_to_kinesis_mapping[sqs_key] } | |
#creating a key for non-conformed records | |
kinesis_records_all['non_conformed'] = { 'Records': [], 'StreamName' : dlq_stream_name } | |
for record in event['Records']: | |
kinesis_records_all = add_record(record, kinesis_records_all) | |
#pushing records retrieved from each sqs source to respective streams & non-conformed records to DLQ Stream | |
for source in kinesis_records_all.keys(): | |
push_to_kinesis(source, kinesis_records_all[source]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment