Last active
April 12, 2023 03:28
-
-
Save harlanji/8a4cd0adac32d626d052034bea560e08 to your computer and use it in GitHub Desktop.
Python EventSourcing: DataClassAsDict Transcoding
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import weakref | |
from eventsourcing.application import Application | |
from eventsourcing.persistence import Transcoding, Transcoder | |
from dataclasses import asdict | |
from dacite import from_dict | |
class DataClassAsDict (Transcoding): | |
""" | |
An abstract base class to transcode nested data, Entities / Value Objects, | |
as dicts using the supplied transcoder's encoder and decoder (eg. JSON). | |
Author: Harlan J. Iverson | |
Source: | |
https://gist.github.com/harlanji/8a4cd0adac32d626d052034bea560e08 | |
""" | |
def __init__ (self, transcoder: Transcoder): | |
self.transcoder = weakref.ref(transcoder) | |
def encode (self, o: self.type) -> str: | |
d = asdict(o) | |
return self.transcoder().encoder.encode(d) | |
def decode (self, d: Union[str, dict]) -> self.type: | |
assert isinstance(d, str) | |
data = self.transcoder().decoder.decode(d) | |
return from_dict(data_class=self.type, data=data) | |
# example value object/entity: | |
class LineItemAsDict (DataClassAsDict): | |
type = Receipt.LineItem | |
name = "line_item" | |
# example application: | |
class ReceiptsApplication (Application): | |
# ... | |
def register_transcodings (self, transcoder: Transcoder): | |
super().register_transcodings(transcoder) | |
transcoder.register(LineItemAsDict(transcoder)) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment