Skip to content

Instantly share code, notes, and snippets.

@melanieplageman
Created June 27, 2024 01:46
Show Gist options
  • Save melanieplageman/7400e81bbbd518fe08b4af55a9b632d4 to your computer and use it in GitHub Desktop.
Save melanieplageman/7400e81bbbd518fe08b4af55a9b632d4 to your computer and use it in GitHub Desktop.
from random import random, randrange
import pprint, copy
from collections import namedtuple
class Moment(namedtuple('Moment', ('time', 'lsn', 'members', 'error'))):
def __eq__(self, other):
return self.time == other.time and self.lsn == other.lsn
def core_next_pair(time, lsn):
next_time = time + randrange(1, 200)
lsn = lsn + random() * (next_time - time)
return next_time, lsn
def big_gap(time, lsn, did_gap):
next_time = time + randrange(1, 200)
lsn = lsn + random() * (next_time - time)
if not did_gap and next_time > 2000 and next_time < 3000:
next_time = next_time * 2
lsn = lsn * 10
did_gap = True
return next_time, lsn, did_gap
class LSNTimeStream:
def __init__(self, length):
self.stream = []
time = 1
lsn = 10
did_gap = False
for i in range(length):
time, lsn, did_gap = big_gap(time, lsn, did_gap)
self.stream.append(Moment(time, lsn, 1, 0))
@staticmethod
def triangle_area(left, right, mid):
rect_all = (right.time - left.time) * (right.lsn - left.lsn)
tri1 = rect_all / 2
tri2 = (mid.lsn - left.lsn) * (mid.time - left.time) / 2
tri3 = (right.lsn - mid.lsn) * (right.time - mid.time) / 2
rect1 = (right.lsn - mid.lsn) * (mid.time - left.time)
return rect_all - tri1 - tri2 - tri3 - rect1
def error_algo_point_to_delete(self):
min_area = None
for i in range(1, len(self.stream) - 1):
area = self.triangle_area(self.stream[i - 1], self.stream[i + 1], self.stream[i])
if min_area is None or abs(area) < abs(min_area):
min_area = area
target_point = i
return target_point, min_area
def error_algo_remove_point(self):
i, error = self.error_algo_point_to_delete()
left = self.stream[i - 1]
self.stream[i - 1] = Moment(left.time, left.lsn, left.members,
error = left.error + self.stream[i].error)
del self.stream[i]
def members_algo_merge_target(self):
for i in range(1, len(self.stream)):
if self.stream[i - 1].members < 2 * self.stream[i].members:
return i
# Safety valve for too small values of desired timeline length
return 1
def members_algo_merge(self):
i = self.members_algo_merge_target()
src = self.stream[i]
dest = self.stream[i - 1]
self.stream[i - 1] = Moment(dest.time, dest.lsn, dest.members + src.members, error = None)
del self.stream[i]
def random_algo_remove_point(self):
i = randrange(1, len(self.stream))
del self.stream[i]
from datetime import datetime
import random as random_module
pp = pprint.PrettyPrinter(indent=2)
random_module.seed(datetime.now().timestamp())
length = 1000
timeline_length = 10
time_stream = LSNTimeStream(length)
member_stream = copy.deepcopy(time_stream)
error_stream = copy.deepcopy(time_stream)
random_stream = copy.deepcopy(time_stream)
for i in range(length - timeline_length):
error_stream.error_algo_remove_point()
for i in range(length - timeline_length):
member_stream.members_algo_merge()
for i in range(length - timeline_length):
random_stream.random_algo_remove_point()
import matplotlib.pyplot as plt
import pandas as pd
ax = plt.axes()
pd.DataFrame(time_stream.stream).plot(x='time', y='lsn', ax=ax, ylabel='lsn', xlabel='time')
pd.DataFrame(member_stream.stream).plot(x='time', y='lsn', ax=ax)
pd.DataFrame(error_stream.stream).plot(x='time', y='lsn', ax=ax)
pd.DataFrame(random_stream.stream).plot(x='time', y='lsn', ax=ax)
ax.legend(['original', 'member_algo', 'error_algo', 'random'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment