Skip to content

Instantly share code, notes, and snippets.

@thoroc
Last active December 1, 2022 15:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thoroc/2e9fb50c148e364cf06b2a18795b6f93 to your computer and use it in GitHub Desktop.
Save thoroc/2e9fb50c148e364cf06b2a18795b6f93 to your computer and use it in GitHub Desktop.
storing EventSourceMapping values in a dataclass and passing it as argument when calling AWS boto3 lambda.create_event_source_mapping
# external dependencies: luguru and caseconverter.
# tested on python 3.9
from dataclasses import dataclass, field
import datetime
from caseconverter import pascalcase
from loguru import logger
@dataclass
class OnFailure:
destination: str
def asdict(self):
return {pascalcase(key): value for key, value in self.__dict__.items()}
@dataclass
class DestinationConfig:
on_failure: OnFailure
def asdict(self):
attrs = {}
for key, value in self.__dict__.items():
attrs[pascalcase(key)] = value.asdict()
return attrs
@dataclass
class EventSourceMapping:
event_source_arn: str
function_name: str
batch_size: int = field(init=False)
parallelization_factor: int = field(init=False)
starting_position: datetime.datetime
starting_position_timestamp: str = field(init=False)
destination_config: DestinationConfig
bisect_batch_on_function_error: bool = field(init=False)
maximum_retry_attempts: int = field(init=False)
def __post_init__(self):
self.batch_size = 16
self.parallelization_factor = 2
self.starting_position_timestamp = self.starting_position.replace(
tzinfo=datetime.timezone.utc
).timestamp()
self.bisect_batch_on_function_error = True
self.maximum_retry_attempts = 5
def asdict(self):
attrs = {}
for key, value in self.__dict__.items():
if isinstance(value, str) or isinstance(value, int) or isinstance(value, bool) or isinstance(value, float):
logger.info(
"{} is either a str, an int or a bool {}", key, type(value))
attrs[pascalcase(key)] = value
elif isinstance(value, datetime.datetime):
logger.info("{} is a datetime", key)
attrs[pascalcase(key)] = value.strftime('%Y-%m-%d %H:%M:%S')
else:
logger.info("{} is an {}", key, type(value))
attrs[pascalcase(key)] = value.asdict()
return attrs
event = EventSourceMapping(
event_source_arn='foo',
function_name='baa',
starting_position=datetime.datetime.now(),
destination_config=DestinationConfig(
on_failure=OnFailure(
destination='test-destination')
)
)
def call_me(**kwargs):
logger.debug("kwargs={}", kwargs)
for key, value in kwargs.items():
logger.debug(f"Key={key} => Value={value}")
def call_me_with_kwargs(
EventSourceArn: str,
FunctionName: str,
StartingPosition: str,
StartingPositionTimestamp: str,
DestinationConfig: dict,
BatchSize: int,
ParallelizationFactor: int,
BisectBatchOnFunctionError: bool,
MaximumRetryAttempts: int
):
logger.debug("EventSourceArn={}", EventSourceArn)
logger.debug("FunctionName={}", FunctionName)
logger.debug("StartingPosition={}", StartingPosition)
logger.debug("StartingPositionTimestamp={}", StartingPositionTimestamp)
logger.debug("DestinationConfig={}", DestinationConfig)
logger.debug("BatchSize={}", BatchSize)
logger.debug("ParallelizationFactor={}", ParallelizationFactor)
logger.debug("BisectBatchOnFunctionError={}", BisectBatchOnFunctionError)
logger.debug("MaximumRetryAttempts={}", MaximumRetryAttempts)
def test(event):
logger.info("Object __repr__: {} type={}", event, type(event))
logger.info("Object as dict: {} type={}",
event.asdict(), type(event.asdict()))
call_me(**event.asdict())
call_me_with_kwargs(**event.asdict())
if __name__ == '__main__':
test(event)
# external dependencies: dataclasses_json and luguru.
# tested on python 3.9
from dataclasses import dataclass, field
from dataclasses_json import dataclass_json, LetterCase
import datetime
from loguru import logger
@dataclass_json()
@dataclass
class OnFailure:
destination: str
@dataclass_json(letter_case=LetterCase.PASCAL)
@dataclass
class DestinationConfig:
on_failure: OnFailure
@dataclass_json(letter_case=LetterCase.PASCAL)
@dataclass
class EventSourceMapping:
event_source_arn: str
function_name: str
batch_size: int = field(init=False, default=16)
parallelization_factor: int = field(init=False, default=2)
starting_position: datetime.datetime
starting_position_timestamp: str = field(init=False)
destination_config: DestinationConfig
bisect_batch_on_function_error: bool = field(init=False, default=True)
maximum_retry_attempts: int = field(init=False, default=5)
def __post_init__(self):
self.starting_position_timestamp = self.starting_position.replace(
tzinfo=datetime.timezone.utc
).timestamp()
event = EventSourceMapping(
event_source_arn='foo',
function_name='baa',
starting_position=datetime.datetime.now(),
destination_config=DestinationConfig(
on_failure=OnFailure(
destination='test-destination')
)
)
def call_me(**kwargs):
logger.debug("kwargs={}", kwargs)
for key, value in kwargs.items():
logger.debug(f"Key={key} => Value={value}")
def call_me_with_kwargs(
EventSourceArn: str,
FunctionName: str,
StartingPosition: str,
StartingPositionTimestamp: str,
DestinationConfig: dict,
BatchSize: int,
ParallelizationFactor: int,
BisectBatchOnFunctionError: bool,
MaximumRetryAttempts: int
):
logger.debug("EventSourceArn={}", EventSourceArn)
logger.debug("FunctionName={}", FunctionName)
logger.debug("StartingPosition={}", StartingPosition)
logger.debug("StartingPositionTimestamp={}", StartingPositionTimestamp)
logger.debug("DestinationConfig={}", DestinationConfig)
logger.debug("BatchSize={}", BatchSize)
logger.debug("ParallelizationFactor={}", ParallelizationFactor)
logger.debug("BisectBatchOnFunctionError={}", BisectBatchOnFunctionError)
logger.debug("MaximumRetryAttempts={}", MaximumRetryAttempts)
def test(event):
logger.info("Object __repr__: {} type={}", event, type(event))
logger.info("Object as dict: {} type={}",
event.asdict(), type(event.asdict()))
# call_me(**event.asdict())
call_me(**event.to_dict())
# call_me_with_kwargs(**event.asdict())
call_me_with_kwargs(**event.to_dict())
if __name__ == '__main__':
test(event)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment