Skip to content

Instantly share code, notes, and snippets.

@mycroft
Created August 6, 2021 07:23
Show Gist options
  • Save mycroft/1f72460edd1ec4ce299b3544080e28ff to your computer and use it in GitHub Desktop.
Save mycroft/1f72460edd1ec4ce299b3544080e28ff to your computer and use it in GitHub Desktop.
test_retention.py
import re
import time
class Stage(object):
"""One of the element of a retention policy.
A stage means "keep that many points with that precision".
Precision is the amount of time a point covers, measured in seconds.
stage0 means that it's the first stage of a retention policy which
is special because it doesn't contain aggregated values.
"""
__slots__ = ("duration", "points", "precision", "stage0")
# Parses the values of as_string into points and precision group
_STR_RE = re.compile(
r"^(?P<points>[\d]+)\*(?P<precision>[\d]+)s(?P<type>(_0|_aggr))?$"
)
def __init__(self, points, precision, stage0=False):
"""Set attributes."""
self.points = int(points)
self.precision = int(precision)
self.duration = self.points * self.precision
self.stage0 = stage0
def __str__(self):
return self.as_string
def __repr__(self):
return "<{0}.{1}({3}) object at {2}>".format(
self.__module__, type(self).__name__, hex(id(self)), self.as_string
)
def __eq__(self, other):
if not isinstance(other, Stage):
return False
return (
self.points == other.points
and self.precision == other.precision
and self.stage0 == other.stage0
)
def __ne__(self, other):
return not (self == other)
def __hash__(self):
return hash((self.points, self.precision, self.stage0))
@property
def as_string(self):
"""A string like "${POINTS}*${PRECISION}s"."""
return "{}*{}s".format(self.points, self.precision)
@property
def as_full_string(self):
"""A string like "${POINTS}*${PRECISION}s"."""
ret = self.as_string
if self.stage0:
ret += "_0"
else:
ret += "_aggr"
return ret
@property
def duration_ms(self):
"""The duration in milliseconds."""
return self.duration * 1000
def epoch(self, timestamp):
"""Return time elapsed since Unix epoch in count of self.duration.
A "stage epoch" is a range of timestamps: [N*stage_duration, (N+1)*stage_duration[
This function returns N.
Args:
timestamp: A timestamp in seconds.
"""
return int(timestamp // self.duration)
@classmethod
def from_string(cls, s):
"""Parse results of as_string into an instance."""
match = cls._STR_RE.match(s)
if not match:
raise InvalidArgumentError("Invalid retention: '%s'" % s)
groups = match.groupdict()
return cls(
points=int(groups["points"]),
precision=int(groups["precision"]),
stage0=bool(groups["type"] == "_0"),
)
@property
def precision_ms(self):
"""The precision of this stage in milliseconds."""
return self.precision * 1000
def round_down(self, timestamp):
"""Round down a timestamp to a multiple of the precision."""
return timestamp # bg_utils.round_down(timestamp, self.precision)
def round_up(self, timestamp):
"""Round down a timestamp to a multiple of the precision."""
return timestamp # bg_utils.round_up(timestamp, self.precision)
def step(self, timestamp):
"""Return time elapsed since Unix epoch in count of self.precision.
A "stage step" is a range of timestamps: [N*stage_precision, (N+1)*stage_precision[
This function returns N.
Args:
timestamp: A timestamp in seconds.
"""
return int(timestamp // self.precision)
def step_ms(self, timestamp_ms):
"""Return time elapsed since Unix epoch in count of self.precision_ms.
A "stage step" is a range of timestamps: [N*stage_precision, (N+1)*stage_precision[
This function returns N.
Args:
timestamp_ms: A timestamp in milliseconds.
"""
return int(timestamp_ms // self.precision_ms)
def aggregated(self):
"""Return true if this contains aggregated values.
Aggregated values are in the form (sum, count) instead of just (value).
Only the first stage of a retention policy isn't aggregated (called stage0).
Returns:
bool, True is not first stage.
"""
return not self.stage0
class Retention(object):
"""A retention policy, made of 0 or more Stages."""
__slots__ = ("stages",)
def __init__(self, stages):
"""Set self.stages ."""
prev = None
if not stages:
raise InvalidArgumentError("there must be at least one stage")
for s in stages:
if prev and s.precision % prev.precision:
raise InvalidArgumentError(
"precision of %s must be a multiple of %s" % (s, prev)
)
if prev and prev.duration >= s.duration:
raise InvalidArgumentError(
"duration of %s must be lesser than %s" % (s, prev)
)
prev = s
self.stages = tuple(stages)
self.stages[0].stage0 = True
def __getitem__(self, n):
"""Return the n-th stage."""
return self.stages[n]
def __hash__(self):
return hash(self.stages)
def __eq__(self, other):
if not isinstance(other, Retention):
return False
return self.stages == other.stages
def __ne__(self, other):
return not (self == other)
@property
def as_string(self):
"""Return strings like "60*60s:24*3600s"."""
return ":".join(s.as_string for s in self.stages)
@classmethod
def from_string(cls, string):
"""Parse results of as_string into an instance.
Args:
string: A string like "60*60s:24*3600s"
"""
if string:
stages = [Stage.from_string(s) for s in string.split(":")]
else:
stages = []
return cls(stages=stages)
@property
def duration(self):
"""Return the maximum duration of all stages."""
if not self.stages:
return 0
return self.stages[-1].duration
@property
def stage0(self):
"""An alias for stage[0]."""
return self.stages[0]
@property
def downsampled_stages(self):
"""An alias for stages[1:]."""
return self.stages[1:]
@classmethod
def from_carbon(cls, l):
"""Make new instance from list of (precision, points).
Note that precision is first, unlike in Stage.__init__
"""
stages = [Stage(points=points, precision=precision) for precision, points in l]
return cls(stages)
def find_stage_for_ts(self, searched, now):
"""Return the most precise stage that contains "searched".
Args:
searched: A timestamp to search, in seconds.
now: The current timestamp, in seconds.
"""
for stage in self.stages:
if searched > now - stage.duration:
return stage
# There is always at least one stage.
return self.stages[-1]
def align_time_window(self, start_time, end_time, now, shift=False):
"""Constrain the provided range in an aligned interval within retention."""
stage = self.find_stage_for_ts(searched=start_time, now=now)
now = stage.round_up(now)
if shift:
oldest_timestamp = now - stage.duration
start_time = max(start_time, oldest_timestamp)
start_time = min(now, start_time)
start_time = stage.round_down(start_time)
end_time = min(now, end_time)
end_time = stage.round_up(end_time)
if end_time < start_time:
end_time = start_time
return start_time, end_time, stage
@property
def points(self):
"""Return the total number of points for this retention."""
for stage in self.stages:
z = time.time()
print("ts\t\t", int(z))
print("stage\t\t", stage)
print("points\t\t", stage.points)
print("precision\t", stage.precision)
print("duration\t", stage.duration, "\t(duration * precision)")
print()
print("epoch\t\t", stage.epoch(z), "\t\t(ts / duration)")
print("current stage\t", stage.step(z), "\t(ts / precision)")
print()
epoch_start = stage.epoch(z) * stage.duration
diff = int(z) - epoch_start
row_size_ms = _row_size_ms(stage)
print()
print("row_size_ms\t", row_size_ms)
timestamp_ms = z * 1000
time_offset_ms = timestamp_ms % _row_size_ms(stage)
time_start_ms = timestamp_ms - time_offset_ms
offset = stage.step_ms(time_offset_ms)
print("time_offset_ms\t", time_offset_ms)
print("time_start_ms\t", time_start_ms)
print("offset\t\t", offset, "\t\t(time_offset_ms/precision)")
print("epoch start\t", epoch_start, "\t(duration * epoch)")
print("time_start_ms\t", epoch_start*1000, "\t(duration * epoch)")
print("diff\t\t", diff, "\t(ts - epoch_start)")
# diff_precision = diff / stage.precision
# print("offset\t\t", int(diff_precision), "\t\taaa")
print()
print()
print()
return sum(stage.points for stage in self.stages)
def _row_size_ms(stage):
"""Number of milliseconds to put in one Cassandra row.
Args:
stage: The stage the table stores.
Returns:
An integer, the duration in milliseconds.
"""
HOUR = 3600
_MAX_PARTITION_SIZE = 25000
_EXPECTED_POINTS_PER_READ = 2000
_MIN_PARTITION_SIZE_MS = 6 * HOUR
row_size_ms = min(
stage.precision_ms * _MAX_PARTITION_SIZE,
max(stage.precision_ms * _EXPECTED_POINTS_PER_READ, _MIN_PARTITION_SIZE_MS),
)
return row_size_ms
# z = Retention.from_string("10080*60s")
z = Retention.from_string("11520*60s") # 60s:8d
# z = Retention.from_string("720*3600s") # 1h:30d
# z = Retention.from_string("730*86400s") # 1d:2y
# call z.points # ho god I should refactor this.
ret = z.points
#timestamp = time.time()
#stage = z.find_stage_for_ts(timestamp, time.time())
#timestamp_ms = int(timestamp) * 1000
#time_offset_ms = timestamp_ms % _row_size_ms(stage)
#time_start_ms = timestamp_ms - time_offset_ms
#offset = stage.step_ms(time_offset_ms)
#print(time_start_ms)
#print(offset)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment