Created
August 6, 2021 07:23
-
-
Save mycroft/1f72460edd1ec4ce299b3544080e28ff to your computer and use it in GitHub Desktop.
test_retention.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import re | |
import time | |
class Stage(object): | |
"""One of the element of a retention policy. | |
A stage means "keep that many points with that precision". | |
Precision is the amount of time a point covers, measured in seconds. | |
stage0 means that it's the first stage of a retention policy which | |
is special because it doesn't contain aggregated values. | |
""" | |
__slots__ = ("duration", "points", "precision", "stage0") | |
# Parses the values of as_string into points and precision group | |
_STR_RE = re.compile( | |
r"^(?P<points>[\d]+)\*(?P<precision>[\d]+)s(?P<type>(_0|_aggr))?$" | |
) | |
def __init__(self, points, precision, stage0=False): | |
"""Set attributes.""" | |
self.points = int(points) | |
self.precision = int(precision) | |
self.duration = self.points * self.precision | |
self.stage0 = stage0 | |
def __str__(self): | |
return self.as_string | |
def __repr__(self): | |
return "<{0}.{1}({3}) object at {2}>".format( | |
self.__module__, type(self).__name__, hex(id(self)), self.as_string | |
) | |
def __eq__(self, other): | |
if not isinstance(other, Stage): | |
return False | |
return ( | |
self.points == other.points | |
and self.precision == other.precision | |
and self.stage0 == other.stage0 | |
) | |
def __ne__(self, other): | |
return not (self == other) | |
def __hash__(self): | |
return hash((self.points, self.precision, self.stage0)) | |
@property | |
def as_string(self): | |
"""A string like "${POINTS}*${PRECISION}s".""" | |
return "{}*{}s".format(self.points, self.precision) | |
@property | |
def as_full_string(self): | |
"""A string like "${POINTS}*${PRECISION}s".""" | |
ret = self.as_string | |
if self.stage0: | |
ret += "_0" | |
else: | |
ret += "_aggr" | |
return ret | |
@property | |
def duration_ms(self): | |
"""The duration in milliseconds.""" | |
return self.duration * 1000 | |
def epoch(self, timestamp): | |
"""Return time elapsed since Unix epoch in count of self.duration. | |
A "stage epoch" is a range of timestamps: [N*stage_duration, (N+1)*stage_duration[ | |
This function returns N. | |
Args: | |
timestamp: A timestamp in seconds. | |
""" | |
return int(timestamp // self.duration) | |
@classmethod | |
def from_string(cls, s): | |
"""Parse results of as_string into an instance.""" | |
match = cls._STR_RE.match(s) | |
if not match: | |
raise InvalidArgumentError("Invalid retention: '%s'" % s) | |
groups = match.groupdict() | |
return cls( | |
points=int(groups["points"]), | |
precision=int(groups["precision"]), | |
stage0=bool(groups["type"] == "_0"), | |
) | |
@property | |
def precision_ms(self): | |
"""The precision of this stage in milliseconds.""" | |
return self.precision * 1000 | |
def round_down(self, timestamp): | |
"""Round down a timestamp to a multiple of the precision.""" | |
return timestamp # bg_utils.round_down(timestamp, self.precision) | |
def round_up(self, timestamp): | |
"""Round down a timestamp to a multiple of the precision.""" | |
return timestamp # bg_utils.round_up(timestamp, self.precision) | |
def step(self, timestamp): | |
"""Return time elapsed since Unix epoch in count of self.precision. | |
A "stage step" is a range of timestamps: [N*stage_precision, (N+1)*stage_precision[ | |
This function returns N. | |
Args: | |
timestamp: A timestamp in seconds. | |
""" | |
return int(timestamp // self.precision) | |
def step_ms(self, timestamp_ms): | |
"""Return time elapsed since Unix epoch in count of self.precision_ms. | |
A "stage step" is a range of timestamps: [N*stage_precision, (N+1)*stage_precision[ | |
This function returns N. | |
Args: | |
timestamp_ms: A timestamp in milliseconds. | |
""" | |
return int(timestamp_ms // self.precision_ms) | |
def aggregated(self): | |
"""Return true if this contains aggregated values. | |
Aggregated values are in the form (sum, count) instead of just (value). | |
Only the first stage of a retention policy isn't aggregated (called stage0). | |
Returns: | |
bool, True is not first stage. | |
""" | |
return not self.stage0 | |
class Retention(object): | |
"""A retention policy, made of 0 or more Stages.""" | |
__slots__ = ("stages",) | |
def __init__(self, stages): | |
"""Set self.stages .""" | |
prev = None | |
if not stages: | |
raise InvalidArgumentError("there must be at least one stage") | |
for s in stages: | |
if prev and s.precision % prev.precision: | |
raise InvalidArgumentError( | |
"precision of %s must be a multiple of %s" % (s, prev) | |
) | |
if prev and prev.duration >= s.duration: | |
raise InvalidArgumentError( | |
"duration of %s must be lesser than %s" % (s, prev) | |
) | |
prev = s | |
self.stages = tuple(stages) | |
self.stages[0].stage0 = True | |
def __getitem__(self, n): | |
"""Return the n-th stage.""" | |
return self.stages[n] | |
def __hash__(self): | |
return hash(self.stages) | |
def __eq__(self, other): | |
if not isinstance(other, Retention): | |
return False | |
return self.stages == other.stages | |
def __ne__(self, other): | |
return not (self == other) | |
@property | |
def as_string(self): | |
"""Return strings like "60*60s:24*3600s".""" | |
return ":".join(s.as_string for s in self.stages) | |
@classmethod | |
def from_string(cls, string): | |
"""Parse results of as_string into an instance. | |
Args: | |
string: A string like "60*60s:24*3600s" | |
""" | |
if string: | |
stages = [Stage.from_string(s) for s in string.split(":")] | |
else: | |
stages = [] | |
return cls(stages=stages) | |
@property | |
def duration(self): | |
"""Return the maximum duration of all stages.""" | |
if not self.stages: | |
return 0 | |
return self.stages[-1].duration | |
@property | |
def stage0(self): | |
"""An alias for stage[0].""" | |
return self.stages[0] | |
@property | |
def downsampled_stages(self): | |
"""An alias for stages[1:].""" | |
return self.stages[1:] | |
@classmethod | |
def from_carbon(cls, l): | |
"""Make new instance from list of (precision, points). | |
Note that precision is first, unlike in Stage.__init__ | |
""" | |
stages = [Stage(points=points, precision=precision) for precision, points in l] | |
return cls(stages) | |
def find_stage_for_ts(self, searched, now): | |
"""Return the most precise stage that contains "searched". | |
Args: | |
searched: A timestamp to search, in seconds. | |
now: The current timestamp, in seconds. | |
""" | |
for stage in self.stages: | |
if searched > now - stage.duration: | |
return stage | |
# There is always at least one stage. | |
return self.stages[-1] | |
def align_time_window(self, start_time, end_time, now, shift=False): | |
"""Constrain the provided range in an aligned interval within retention.""" | |
stage = self.find_stage_for_ts(searched=start_time, now=now) | |
now = stage.round_up(now) | |
if shift: | |
oldest_timestamp = now - stage.duration | |
start_time = max(start_time, oldest_timestamp) | |
start_time = min(now, start_time) | |
start_time = stage.round_down(start_time) | |
end_time = min(now, end_time) | |
end_time = stage.round_up(end_time) | |
if end_time < start_time: | |
end_time = start_time | |
return start_time, end_time, stage | |
@property | |
def points(self): | |
"""Return the total number of points for this retention.""" | |
for stage in self.stages: | |
z = time.time() | |
print("ts\t\t", int(z)) | |
print("stage\t\t", stage) | |
print("points\t\t", stage.points) | |
print("precision\t", stage.precision) | |
print("duration\t", stage.duration, "\t(duration * precision)") | |
print() | |
print("epoch\t\t", stage.epoch(z), "\t\t(ts / duration)") | |
print("current stage\t", stage.step(z), "\t(ts / precision)") | |
print() | |
epoch_start = stage.epoch(z) * stage.duration | |
diff = int(z) - epoch_start | |
row_size_ms = _row_size_ms(stage) | |
print() | |
print("row_size_ms\t", row_size_ms) | |
timestamp_ms = z * 1000 | |
time_offset_ms = timestamp_ms % _row_size_ms(stage) | |
time_start_ms = timestamp_ms - time_offset_ms | |
offset = stage.step_ms(time_offset_ms) | |
print("time_offset_ms\t", time_offset_ms) | |
print("time_start_ms\t", time_start_ms) | |
print("offset\t\t", offset, "\t\t(time_offset_ms/precision)") | |
print("epoch start\t", epoch_start, "\t(duration * epoch)") | |
print("time_start_ms\t", epoch_start*1000, "\t(duration * epoch)") | |
print("diff\t\t", diff, "\t(ts - epoch_start)") | |
# diff_precision = diff / stage.precision | |
# print("offset\t\t", int(diff_precision), "\t\taaa") | |
print() | |
print() | |
print() | |
return sum(stage.points for stage in self.stages) | |
def _row_size_ms(stage): | |
"""Number of milliseconds to put in one Cassandra row. | |
Args: | |
stage: The stage the table stores. | |
Returns: | |
An integer, the duration in milliseconds. | |
""" | |
HOUR = 3600 | |
_MAX_PARTITION_SIZE = 25000 | |
_EXPECTED_POINTS_PER_READ = 2000 | |
_MIN_PARTITION_SIZE_MS = 6 * HOUR | |
row_size_ms = min( | |
stage.precision_ms * _MAX_PARTITION_SIZE, | |
max(stage.precision_ms * _EXPECTED_POINTS_PER_READ, _MIN_PARTITION_SIZE_MS), | |
) | |
return row_size_ms | |
# z = Retention.from_string("10080*60s") | |
z = Retention.from_string("11520*60s") # 60s:8d | |
# z = Retention.from_string("720*3600s") # 1h:30d | |
# z = Retention.from_string("730*86400s") # 1d:2y | |
# call z.points # ho god I should refactor this. | |
ret = z.points | |
#timestamp = time.time() | |
#stage = z.find_stage_for_ts(timestamp, time.time()) | |
#timestamp_ms = int(timestamp) * 1000 | |
#time_offset_ms = timestamp_ms % _row_size_ms(stage) | |
#time_start_ms = timestamp_ms - time_offset_ms | |
#offset = stage.step_ms(time_offset_ms) | |
#print(time_start_ms) | |
#print(offset) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment