-
-
Save walmsles/3274fd25a1e0bcd62589ded8dad7d2a8 to your computer and use it in GitHub Desktop.
serverless-testing-part-3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from http import HTTPStatus | |
from typing import Any, Dict | |
import aws_lambda_powertools.event_handler.content_types as content_types | |
from adapters.service import EventService | |
from aws_lambda_powertools.event_handler import APIGatewayRestResolver, Response | |
from aws_lambda_powertools.event_handler.exceptions import InternalServerError | |
from aws_lambda_powertools.logging import Logger | |
app = APIGatewayRestResolver() | |
logger = Logger() | |
event_service = EventService() | |
# | |
# The Lambda Handler is kept small and concise | |
# using AWS Lambda Powertools for python | |
# | |
@logger.inject_lambda_context(log_event=True) | |
def lambda_handler(event, context) -> Dict[str, Any]: | |
return app.resolve(event, context) | |
# | |
# The real work is handled by post_event route function | |
# This function is responsible for mapping the input from the API | |
# body to the event_service handler class | |
# | |
@app.post("/events") | |
def post_event(): | |
transaction_id = app.current_event.request_context.request_id | |
response = event_service.process_event( | |
transaction_id, | |
app.current_event.json_body, | |
) | |
logger.info(response) | |
if response: | |
res_obj = {"transaction_id": transaction_id} | |
return Response( | |
status_code=HTTPStatus.OK.value, | |
body=json.dumps(res_obj), | |
content_type=content_types.APPLICATION_JSON, | |
) | |
else: | |
raise (InternalServerError("Processing failure")) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment