Skip to content

Instantly share code, notes, and snippets.

@walmsles
Created December 28, 2022 13:01
Show Gist options
  • Save walmsles/3274fd25a1e0bcd62589ded8dad7d2a8 to your computer and use it in GitHub Desktop.
Save walmsles/3274fd25a1e0bcd62589ded8dad7d2a8 to your computer and use it in GitHub Desktop.
serverless-testing-part-3
import json
from http import HTTPStatus
from typing import Any, Dict
import aws_lambda_powertools.event_handler.content_types as content_types
from adapters.service import EventService
from aws_lambda_powertools.event_handler import APIGatewayRestResolver, Response
from aws_lambda_powertools.event_handler.exceptions import InternalServerError
from aws_lambda_powertools.logging import Logger
app = APIGatewayRestResolver()
logger = Logger()
event_service = EventService()
#
# The Lambda Handler is kept small and concise
# using AWS Lambda Powertools for python
#
@logger.inject_lambda_context(log_event=True)
def lambda_handler(event, context) -> Dict[str, Any]:
return app.resolve(event, context)
#
# The real work is handled by post_event route function
# This function is responsible for mapping the input from the API
# body to the event_service handler class
#
@app.post("/events")
def post_event():
transaction_id = app.current_event.request_context.request_id
response = event_service.process_event(
transaction_id,
app.current_event.json_body,
)
logger.info(response)
if response:
res_obj = {"transaction_id": transaction_id}
return Response(
status_code=HTTPStatus.OK.value,
body=json.dumps(res_obj),
content_type=content_types.APPLICATION_JSON,
)
else:
raise (InternalServerError("Processing failure"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment