Skip to content

Instantly share code, notes, and snippets.

@outofmbufs
Created June 1, 2024 20:31
Show Gist options
  • Save outofmbufs/1cffcdc3d781b300370a4c1baed4c67d to your computer and use it in GitHub Desktop.
Save outofmbufs/1cffcdc3d781b300370a4c1baed4c67d to your computer and use it in GitHub Desktop.
Python Adaptive delay - tracks cumulative delay and adjusts the sleep interval accordingly
import time
from collections import deque
from itertools import pairwise, cycle
class AdaptiveDelay:
def __init__(self, interval, /, *, window_seconds=5):
self.interval = interval
self.window_seconds = window_seconds
# approximate (i.e., "expected") number of measurements. Minimum 2.
self.n = max(round(self.window_seconds / self.interval), 2)
# fixed-length FIFO of post-sleep timestamps
self.tq = deque([], self.n)
# these parameters are arbitrary and shouldn't need tweaking, but,
# well, here they are if they need to be tweaked
# _min_measures: number of measured intervals before any
# adjustment will be done (startup conditions)
#
# _makeup_n: number of intervals used to catch up
#
# _mindelay: absolute minimum delay to ever sleep (no matter
# how far behind the timing is)
self._min_measures = 10
self._makeup_n = 5
self._mindelay = 0.01
# Having this as a method makes it possible to override for
# testing porpoises or other specialized requirements.
def _sleep(self, seconds):
time.sleep(seconds)
def delay(self):
"""Sleep for an interval, but adjusted based on real timing."""
self._sleep(self._compute_delay())
self.tq.append(time.perf_counter())
def _compute_delay(self):
"""Return the adaptive amount to delay based on history."""
# determine total delay over the measurement window
delays = list(map(lambda t: t[1] - t[0], pairwise(self.tq)))
n = len(delays)
if n < self._min_measures:
return self.interval
totaldelay = sum(delays)
# and the difference from what that should have been
# positive is: been running fast; negative is running slow
diff = (self.interval * n) - totaldelay
# make that up over the _makeup_n number of intervals,
# but never delay less than _mindelay
adj = diff / self._makeup_n
return max(self.interval + adj, self._mindelay)
if __name__ == "__main__":
import unittest
import random
class TestMethods(unittest.TestCase):
# empirically the tests get to within this percentage of target
# This is fundamentally fragile and there is a tradeoff between
# how close the AdaptiveDelay will get and how long the test
# has to run to get there. This seems to be a sweet spot.
TEST_TOLERANCE = 0.1
TEST_DURATION = 30 # for EACH test that does this
def _distorted(self, interval, totaltime, fuzzes):
a = AdaptiveDelay(interval)
# crude, but effective
t_record = deque([], 1000) # useful for debugging / analysis
g = cycle(fuzzes)
def fuzzysleep(t):
t_record.append(t)
time.sleep(next(g) * t)
a._sleep = fuzzysleep
t0 = time.perf_counter()
for _ in range(round(totaltime / interval)):
a.delay()
return time.perf_counter() - t0, t_record
def _distorted_testlogic(self, fuzzes):
t, r = self._distorted(0.1, self.TEST_DURATION, fuzzes)
offby = (t - self.TEST_DURATION) / self.TEST_DURATION
self.assertTrue(abs(offby) < self.TEST_TOLERANCE)
def test_toolong(self):
self._distorted_testlogic([1.5])
def test_waytoolong(self):
self._distorted_testlogic([3.0])
# the "too short" conditions don't mesh well with the algorithm
# (which really "should be" a full-on PID controller) but in
# practice the delays are "never" too short so the robustness in
# that direction are less important. These pass...
def test_tooshort(self):
self._distorted_testlogic([0.9])
def test_moretooshort(self):
self._distorted_testlogic([0.6])
def test_varying_longs(self):
for fuzzes in (
(1.1, 1.2, 1.3, 1.4, 1.5),
(1.2, 1.0, 1.2, 1.0, 1.2),
(2.0, 1.0, 1.0, 1.0),
(1.0, 1.0, 1.1, 3.0)):
with self.subTest(fuzzes=fuzzes):
self._distorted_testlogic(fuzzes)
def test_random(self):
for i in range(5):
fuzzes = [0.5 + random.random() for _ in range(20)]
with self.subTest(fuzzes=fuzzes):
self._distorted_testlogic(fuzzes)
print("NOTE: RUNNING THE COMPLETE TEST SUITE TAKES 4-5 MINUTES")
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment