Last active
December 1, 2022 15:40
-
-
Save thoroc/2e9fb50c148e364cf06b2a18795b6f93 to your computer and use it in GitHub Desktop.
storing EventSourceMapping values in a dataclass and passing it as argument when calling AWS boto3 lambda.create_event_source_mapping
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# external dependencies: luguru and caseconverter. | |
# tested on python 3.9 | |
from dataclasses import dataclass, field | |
import datetime | |
from caseconverter import pascalcase | |
from loguru import logger | |
@dataclass | |
class OnFailure: | |
destination: str | |
def asdict(self): | |
return {pascalcase(key): value for key, value in self.__dict__.items()} | |
@dataclass | |
class DestinationConfig: | |
on_failure: OnFailure | |
def asdict(self): | |
attrs = {} | |
for key, value in self.__dict__.items(): | |
attrs[pascalcase(key)] = value.asdict() | |
return attrs | |
@dataclass | |
class EventSourceMapping: | |
event_source_arn: str | |
function_name: str | |
batch_size: int = field(init=False) | |
parallelization_factor: int = field(init=False) | |
starting_position: datetime.datetime | |
starting_position_timestamp: str = field(init=False) | |
destination_config: DestinationConfig | |
bisect_batch_on_function_error: bool = field(init=False) | |
maximum_retry_attempts: int = field(init=False) | |
def __post_init__(self): | |
self.batch_size = 16 | |
self.parallelization_factor = 2 | |
self.starting_position_timestamp = self.starting_position.replace( | |
tzinfo=datetime.timezone.utc | |
).timestamp() | |
self.bisect_batch_on_function_error = True | |
self.maximum_retry_attempts = 5 | |
def asdict(self): | |
attrs = {} | |
for key, value in self.__dict__.items(): | |
if isinstance(value, str) or isinstance(value, int) or isinstance(value, bool) or isinstance(value, float): | |
logger.info( | |
"{} is either a str, an int or a bool {}", key, type(value)) | |
attrs[pascalcase(key)] = value | |
elif isinstance(value, datetime.datetime): | |
logger.info("{} is a datetime", key) | |
attrs[pascalcase(key)] = value.strftime('%Y-%m-%d %H:%M:%S') | |
else: | |
logger.info("{} is an {}", key, type(value)) | |
attrs[pascalcase(key)] = value.asdict() | |
return attrs | |
event = EventSourceMapping( | |
event_source_arn='foo', | |
function_name='baa', | |
starting_position=datetime.datetime.now(), | |
destination_config=DestinationConfig( | |
on_failure=OnFailure( | |
destination='test-destination') | |
) | |
) | |
def call_me(**kwargs): | |
logger.debug("kwargs={}", kwargs) | |
for key, value in kwargs.items(): | |
logger.debug(f"Key={key} => Value={value}") | |
def call_me_with_kwargs( | |
EventSourceArn: str, | |
FunctionName: str, | |
StartingPosition: str, | |
StartingPositionTimestamp: str, | |
DestinationConfig: dict, | |
BatchSize: int, | |
ParallelizationFactor: int, | |
BisectBatchOnFunctionError: bool, | |
MaximumRetryAttempts: int | |
): | |
logger.debug("EventSourceArn={}", EventSourceArn) | |
logger.debug("FunctionName={}", FunctionName) | |
logger.debug("StartingPosition={}", StartingPosition) | |
logger.debug("StartingPositionTimestamp={}", StartingPositionTimestamp) | |
logger.debug("DestinationConfig={}", DestinationConfig) | |
logger.debug("BatchSize={}", BatchSize) | |
logger.debug("ParallelizationFactor={}", ParallelizationFactor) | |
logger.debug("BisectBatchOnFunctionError={}", BisectBatchOnFunctionError) | |
logger.debug("MaximumRetryAttempts={}", MaximumRetryAttempts) | |
def test(event): | |
logger.info("Object __repr__: {} type={}", event, type(event)) | |
logger.info("Object as dict: {} type={}", | |
event.asdict(), type(event.asdict())) | |
call_me(**event.asdict()) | |
call_me_with_kwargs(**event.asdict()) | |
if __name__ == '__main__': | |
test(event) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# external dependencies: dataclasses_json and luguru. | |
# tested on python 3.9 | |
from dataclasses import dataclass, field | |
from dataclasses_json import dataclass_json, LetterCase | |
import datetime | |
from loguru import logger | |
@dataclass_json() | |
@dataclass | |
class OnFailure: | |
destination: str | |
@dataclass_json(letter_case=LetterCase.PASCAL) | |
@dataclass | |
class DestinationConfig: | |
on_failure: OnFailure | |
@dataclass_json(letter_case=LetterCase.PASCAL) | |
@dataclass | |
class EventSourceMapping: | |
event_source_arn: str | |
function_name: str | |
batch_size: int = field(init=False, default=16) | |
parallelization_factor: int = field(init=False, default=2) | |
starting_position: datetime.datetime | |
starting_position_timestamp: str = field(init=False) | |
destination_config: DestinationConfig | |
bisect_batch_on_function_error: bool = field(init=False, default=True) | |
maximum_retry_attempts: int = field(init=False, default=5) | |
def __post_init__(self): | |
self.starting_position_timestamp = self.starting_position.replace( | |
tzinfo=datetime.timezone.utc | |
).timestamp() | |
event = EventSourceMapping( | |
event_source_arn='foo', | |
function_name='baa', | |
starting_position=datetime.datetime.now(), | |
destination_config=DestinationConfig( | |
on_failure=OnFailure( | |
destination='test-destination') | |
) | |
) | |
def call_me(**kwargs): | |
logger.debug("kwargs={}", kwargs) | |
for key, value in kwargs.items(): | |
logger.debug(f"Key={key} => Value={value}") | |
def call_me_with_kwargs( | |
EventSourceArn: str, | |
FunctionName: str, | |
StartingPosition: str, | |
StartingPositionTimestamp: str, | |
DestinationConfig: dict, | |
BatchSize: int, | |
ParallelizationFactor: int, | |
BisectBatchOnFunctionError: bool, | |
MaximumRetryAttempts: int | |
): | |
logger.debug("EventSourceArn={}", EventSourceArn) | |
logger.debug("FunctionName={}", FunctionName) | |
logger.debug("StartingPosition={}", StartingPosition) | |
logger.debug("StartingPositionTimestamp={}", StartingPositionTimestamp) | |
logger.debug("DestinationConfig={}", DestinationConfig) | |
logger.debug("BatchSize={}", BatchSize) | |
logger.debug("ParallelizationFactor={}", ParallelizationFactor) | |
logger.debug("BisectBatchOnFunctionError={}", BisectBatchOnFunctionError) | |
logger.debug("MaximumRetryAttempts={}", MaximumRetryAttempts) | |
def test(event): | |
logger.info("Object __repr__: {} type={}", event, type(event)) | |
logger.info("Object as dict: {} type={}", | |
event.asdict(), type(event.asdict())) | |
# call_me(**event.asdict()) | |
call_me(**event.to_dict()) | |
# call_me_with_kwargs(**event.asdict()) | |
call_me_with_kwargs(**event.to_dict()) | |
if __name__ == '__main__': | |
test(event) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment