-
-
Save nicoolas25/ff5df0701fddb6ab3c463debdeaaee91 to your computer and use it in GitHub Desktop.
Modeling evolving values
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
from uuid import uuid4 | |
from dateutil.relativedelta import relativedelta | |
from .effective_period import EffectiveDayPeriod | |
from .employment import Employment, EmploymentSnapshot, MoneyAmount | |
from .timeline import Timeline, TimelineError | |
def test_salary_timeline(): | |
employment_start_at = datetime(2020, 1, 1) | |
effective_period = EffectiveDayPeriod(start_at=employment_start_at) | |
employment = _build_employment(effective_period=effective_period) | |
def snapshot_at(at_time, known_at = None) -> EmploymentSnapshot: | |
return employment.get_snapshot(at_time=at_time, known_at=known_at) | |
# Salary on start_at, no raise at that point | |
assert snapshot_at(employment_start_at).salary == 80_000_00 | |
# Salary bump after 13 months 🎉 | |
raise_at = effective_period.start_at + relativedelta(months=1) | |
effective_period_for_raise = EffectiveDayPeriod(start_at=raise_at) | |
employment.set_salary(effective_period=effective_period_for_raise, salary=85_000_00) | |
# Salary data points after the raise is applied | |
assert snapshot_at(raise_at).salary == 85_000_00 | |
# We still have the salary history right when looking | |
a_day_before_raise = raise_at - relativedelta(days=1) | |
assert snapshot_at(a_day_before_raise).salary == 80_000_00 | |
# Salary after the raise BUT seen from the perspective of before we knew about it 🙃 | |
one_minute_before_this_test = datetime.now() - relativedelta(minutes=1) | |
assert snapshot_at(raise_at, known_at=one_minute_before_this_test).salary == 80_000_00 | |
# Trying to access the salary outside of what we know about | |
a_day_before_employment_starts = employment_start_at - relativedelta(days=1) | |
assert snapshot_at(a_day_before_employment_starts).salary is None | |
def _build_employment(effective_period: EffectiveDayPeriod) -> Employment: | |
employment = Employment( | |
effective_period=effective_period, | |
internal_id=uuid4(), | |
user_id=uuid4(), | |
company_id=uuid4(), | |
salary_timeline=Timeline[EffectiveDayPeriod, MoneyAmount](name="salary"), | |
job_title_timeline=Timeline[EffectiveDayPeriod, str](name="job_title"), | |
) | |
# NOTE: We use the lower level API to record old _known_at values. | |
# It's only intended at testing purpose, hence the underscore prefix. | |
employment.salary_timeline.record( | |
effective_period=effective_period, | |
value=80_000_00, | |
_known_at=effective_period.start_at, | |
) | |
employment.job_title_timeline.record( | |
effective_period=effective_period, | |
value="Software engineer", | |
_known_at=effective_period.start_at, | |
) | |
return employment |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dataclasses import dataclass, field | |
from datetime import datetime | |
from typing import TypeVar, Union | |
P = TypeVar("P", bound="EffectivePeriod") | |
@dataclass(frozen=True) | |
class EffectivePeriod: | |
start_at: datetime | |
end_at: datetime = field(default=datetime.max) | |
def __contains__(self, d: Union[datetime, "EffectivePeriod"]) -> bool: | |
if isinstance(d, datetime): | |
return self.start_at <= d <= self.end_at | |
elif isinstance(d, EffectivePeriod): | |
return d.start_at >= self.start_at and d.end_at <= self.end_at | |
else: | |
raise TypeError("Contains is only implemented for date and EffectivePeriod") | |
def __post_init__(self): | |
if self.start_at > self.end_at: | |
raise ValueError("Invalid EffectivePeriod: start_at is after end_at") | |
@dataclass(frozen=True) | |
class EffectiveDayPeriod(EffectivePeriod): | |
""" | |
Same as EffectivePeriod but: | |
1. rounds start_at to the first second of the day, and | |
2. rounds ends_at to the last second of the day. | |
""" | |
def __post_init__(self): | |
object.__setattr__( | |
self, | |
"start_at", | |
self.start_at.replace(hour=0, minute=0, second=0, microsecond=0), | |
) | |
object.__setattr__( | |
self, | |
"end_at", | |
self.end_at.replace(hour=23, minute=59, second=59, microsecond=0) | |
if self.end_at != datetime.max | |
else self.end_at, | |
) | |
super().__post_init__() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dataclasses import dataclass | |
from datetime import datetime | |
from typing import TypeVar | |
from uuid import UUID | |
from .effective_period import EffectiveDayPeriod | |
from .timeline import Timeline, MissingValueError | |
MoneyAmount = int | |
T = TypeVar("T") | |
@dataclass | |
class Employment: | |
effective_period: EffectiveDayPeriod | |
internal_id: UUID | |
user_id: UUID | |
company_id: UUID | |
salary_timeline: Timeline[EffectiveDayPeriod, MoneyAmount] | |
job_title_timeline: Timeline[EffectiveDayPeriod, str] | |
def set_salary(self, effective_period: EffectiveDayPeriod, salary: MoneyAmount): | |
self.salary_timeline.record( | |
effective_period=effective_period, | |
value=salary, | |
) | |
def set_job_title(self, effective_period: EffectiveDayPeriod, job_title: str): | |
self.job_title_timeline.record( | |
effective_period=effective_period, | |
value=job_title, | |
) | |
def get_snapshot( | |
self, | |
at_time: datetime, | |
known_at: datetime | None = None, | |
) -> "EmploymentSnapshot": | |
if known_at is None: | |
known_at = datetime.now() | |
def fetch_from_timeline(timeline: Timeline[EffectiveDayPeriod, T]) -> T | None: | |
try: | |
return timeline.fetch(at_time=at_time, known_at=known_at) | |
except MissingValueError: | |
return None | |
return EmploymentSnapshot( | |
effective_period=self.effective_period, | |
internal_id=self.internal_id, | |
company_id=self.company_id, | |
user_id=self.user_id, | |
salary=fetch_from_timeline(self.salary_timeline), | |
job_title=fetch_from_timeline(self.job_title_timeline), | |
snapshot_subject=self, | |
snapshot_at=at_time, | |
snapshot_known_at=known_at, | |
) | |
@dataclass(frozen=True) | |
class EmploymentSnapshot: | |
# NOTE: the generation of this class and of the Employment.get_snapshot | |
# could be automated with a bit of introspection, in order to DRY things. | |
# Continuous Employment's properties | |
effective_period: EffectiveDayPeriod | |
internal_id: UUID | |
company_id: UUID | |
user_id: UUID | |
# Employment's Timeline properties, valued to None if not found at snapshot_at | |
salary: MoneyAmount | None | |
job_title: str | None | |
# Generic snapshot properties | |
snapshot_subject: Employment | |
snapshot_at: datetime | |
snapshot_known_at: datetime |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from dataclasses import dataclass | |
from datetime import datetime | |
from typing import Generic, Sequence, TypeVar | |
from .effective_period import EffectivePeriod | |
class TimelineError(Exception): | |
... | |
class MissingValueError(TimelineError): | |
... | |
P = TypeVar("P", bound=EffectivePeriod) | |
T = TypeVar("T") | |
@dataclass(frozen=True) | |
class _Entry(Generic[P, T]): | |
effective_period: P | |
value: T | |
known_at: datetime | |
version: int | |
class Timeline(Generic[P, T]): | |
_entries: list[_Entry[P, T]] | |
_name: str | |
def __init__( | |
self, | |
name: str, | |
entries: Sequence[_Entry[P, T]] = tuple(), | |
) -> None: | |
super().__init__() | |
self._name = name | |
self._entries = [ | |
_Entry( | |
effective_period=entry.effective_period, | |
value=entry.value, | |
known_at=entry.known_at, | |
version=i, | |
) | |
for i, entry in enumerate(entries) | |
] | |
# Trust entries' order... but verify its consistency. | |
for previous_entry, next_entry in zip(self._entries, self._entries[1:]): | |
if previous_entry.known_at > next_entry.known_at: | |
raise TimelineError( | |
"Entries are expected be ordered according to our knowledge timeline" | |
) | |
def record( | |
self, | |
effective_period: P, | |
value: T, | |
_known_at: datetime | None = None, | |
) -> None: | |
""" | |
Record the value of the Timeline's property over effective_period. | |
""" | |
next_version = self._entries[-1].version + 1 if self._entries else 0 | |
self._entries.append( | |
_Entry( | |
effective_period=effective_period, | |
value=value, | |
known_at=_known_at or datetime.now(), | |
version=next_version, | |
), | |
) | |
def fetch( | |
self, | |
at_time: datetime, | |
known_at: datetime | None = None, | |
) -> T: | |
""" | |
Return the latest known (using known_at as reference) recorded value at at_time. | |
If no knowledge is found at that time, then a MissingValueError is raised. | |
""" | |
# NOTE: We look at the latest piece of knowledge we have on the property for that point in time. | |
# Since the entries are ordered, we can stop looking at the first matching entry. | |
for entry in reversed(self._entries): | |
if known_at is not None and entry.known_at > known_at: | |
continue | |
if at_time in entry.effective_period: | |
return entry.value | |
raise MissingValueError( | |
f"No known value at the given time in {self._name}'s Timeline" | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment