Skip to content

Instantly share code, notes, and snippets.

@Ktmi
Last active May 1, 2024 21:44
Show Gist options
  • Save Ktmi/a632abd0bb502e4787e702cb60cf658f to your computer and use it in GitHub Desktop.
Save Ktmi/a632abd0bb502e4787e702cb60cf658f to your computer and use it in GitHub Desktop.
Pacing tests for kytos
#!/usr/bin/env python
import asyncio
from dataclasses import field, dataclass
import logging
import threading
import time
from janus import Queue
from kytos.core.pacing import Pacer
from limits import RateLimitItem, parse
from limits.aio.strategies import RateLimiter, FixedWindowRateLimiter
from limits.aio.storage import MemoryStorage
LOG = logging.getLogger(__name__)
@dataclass
class Pacer2:
strategy: RateLimiter
pace_config: dict[str, RateLimitItem] = field(default_factory=dict)
def inject_config(self, config: dict):
"""
Inject settings for pacing
"""
self.pace_config.update(
{
key: parse(f"{value['max_concurrent']}/{int(value['refresh_period'])}seconds")
for key, value in config.items()
}
)
async def ahit(self, action_name, *keys):
identifiers = self.pace_config[action_name], action_name, *keys
while not await self.strategy.hit(*identifiers):
window_reset, _ = await self.strategy.get_window_stats(
*identifiers
)
sleep_time = window_reset - time.time()
await asyncio.sleep(sleep_time)
@dataclass
class Pacer3:
"""Class for controlling the rate at which actions are executed."""
strategy: RateLimiter
pace_config: dict[str, RateLimitItem] = field(default_factory=dict)
pending: Queue[tuple[asyncio.Event, str, tuple]] = field(default=None)
scheduling: dict[tuple, asyncio.Queue] =\
field(default_factory=dict)
async def serve(self):
"""
Serve pacing requests.
"""
LOG.info("Starting pacer.")
if self.pending is not None:
LOG.error("Tried to start pacer, already started.")
self.pending = Queue()
queue = self.pending.async_q
try:
async with asyncio.TaskGroup() as tg:
while True:
event, action_name, keys = await queue.get()
if action_name not in self.pace_config:
LOG.warning("Pace for `%s` is not set", action_name)
event.set()
continue
keys = action_name, *keys
if keys not in self.scheduling:
self.scheduling[keys] = asyncio.Queue()
sub_queue = self.scheduling[keys]
sub_queue.put_nowait(event)
tg.create_task(self.__process_one(*keys))
except Exception as ex:
LOG.error("Pacing encounter error %s", ex)
raise ex
finally:
LOG.info("Shutting down pacer.")
self.pending = None
async def __process_one(
self,
action_name: str,
*keys
):
"""Ensure's fairness in dispatch."""
keys = action_name, *keys
queue = self.scheduling[keys]
limit = self.pace_config[action_name]
identifiers = limit, *keys
while not await self.strategy.hit(*identifiers):
window_reset, _ = await self.strategy.get_window_stats(
*identifiers
)
sleep_time = window_reset - time.time()
await asyncio.sleep(sleep_time)
event: asyncio.Event = queue.get_nowait()
event.set()
async def ahit(
self,
action_name: str,
*keys
):
"""
Asynchronous variant of `hit`.
This can be called from the serving thread safely.
"""
if self.pending is None:
LOG.error("Pacer is not yet started")
return
ev = asyncio.Event()
await self.pending.async_q.put(
(ev, action_name, keys)
)
await ev.wait()
def hit(self, action_name, *keys):
"""
Pace execution, based on the pacing config for the given `action_name`.
Keys can be included to allow multiple objects
to be be paced separately on the same action.
This should not be called from the same thread serving
the pacing.
"""
if self.pending is None:
LOG.error("Pacer is not yet started")
return
ev = threading.Event()
self.pending.sync_q.put(
(ev, action_name, keys)
)
ev.wait()
def inject_config(self, config: dict):
"""
Inject settings for pacing
"""
self.pace_config.update(
{
key: parse(f"{value['max_concurrent']}/{int(value['refresh_period'])}seconds")
for key, value in config.items()
}
)
@dataclass
class Pacer4:
"""Class for controlling the rate at which actions are executed."""
pace_config: dict[str, tuple[int, float]] = field(default_factory=dict)
pending: asyncio.Queue[tuple[asyncio.Event, str, tuple]] = field(default=None)
scheduling: dict[tuple, tuple[asyncio.Semaphore, asyncio.Queue]] =\
field(default_factory=dict)
async def serve(self):
"""
Serve pacing requests.
"""
LOG.info("Starting pacer.")
if self.pending is not None:
LOG.error("Tried to start pacer, already started.")
self.pending = asyncio.Queue()
queue = self.pending#.async_q
try:
async with asyncio.TaskGroup() as tg:
while True:
event, action_name, keys = await queue.get()
if action_name not in self.pace_config:
LOG.warning("Pace for `%s` is not set", action_name)
event.set()
continue
keys = action_name, *keys
if keys not in self.scheduling:
max_concurrent, _ = self.pace_config[action_name]
self.scheduling[keys] = (
asyncio.Semaphore(max_concurrent),
asyncio.Queue()
)
_, sub_queue = self.scheduling[keys]
sub_queue.put_nowait(event)
tg.create_task(self.__process_one(*keys))
except Exception as ex:
LOG.error("Pacing encounter error %s", ex)
raise ex
finally:
LOG.info("Shutting down pacer.")
self.pending = None
async def __process_one(
self,
action_name: str,
*keys
):
"""Ensure's fairness in dispatch."""
keys = action_name, *keys
semaphore, queue = self.scheduling[keys]
_, refresh_period = self.pace_config[action_name]
if semaphore.locked():
LOG.warning("Pace limit reached on %s", keys)
async with semaphore:
event: asyncio.Event = queue.get_nowait()
event.set()
await asyncio.sleep(refresh_period)
async def ahit(
self,
action_name: str,
*keys
):
"""
Asynchronous variant of `hit`.
This can be called from the serving thread safely.
"""
if self.pending is None:
LOG.error("Pacer is not yet started")
return
ev = asyncio.Event()
await self.pending.put(
(ev, action_name, keys)
)
await ev.wait()
def inject_config(self, config: dict):
"""
Inject settings for pacing
"""
self.pace_config.update(
{
key: (value['max_concurrent'], value['refresh_period'])
for key, value in config.items()
}
)
@dataclass
class Pacer5:
"""Class for controlling the rate at which actions are executed."""
strategy: RateLimiter
pace_config: dict[str, RateLimitItem] = field(default_factory=dict)
pending: asyncio.Queue[tuple[asyncio.Event, str, tuple]] = field(default=None)
scheduling: dict[tuple, asyncio.Queue] =\
field(default_factory=dict)
async def serve(self):
"""
Serve pacing requests.
"""
LOG.info("Starting pacer.")
if self.pending is not None:
LOG.error("Tried to start pacer, already started.")
self.pending = asyncio.Queue()
queue = self.pending
try:
async with asyncio.TaskGroup() as tg:
while True:
event, action_name, keys = await queue.get()
if action_name not in self.pace_config:
LOG.warning("Pace for `%s` is not set", action_name)
event.set()
continue
keys = action_name, *keys
if keys not in self.scheduling:
self.scheduling[keys] = asyncio.Queue()
sub_queue = self.scheduling[keys]
sub_queue.put_nowait(event)
tg.create_task(self.__process_one(*keys))
except Exception as ex:
LOG.error("Pacing encounter error %s", ex)
raise ex
finally:
LOG.info("Shutting down pacer.")
self.pending = None
async def __process_one(
self,
action_name: str,
*keys
):
"""Ensure's fairness in dispatch."""
keys = action_name, *keys
queue = self.scheduling[keys]
limit = self.pace_config[action_name]
identifiers = limit, *keys
while not await self.strategy.hit(*identifiers):
window_reset, _ = await self.strategy.get_window_stats(
*identifiers
)
sleep_time = window_reset - time.time()
await asyncio.sleep(sleep_time)
event: asyncio.Event = queue.get_nowait()
event.set()
async def ahit(
self,
action_name: str,
*keys
):
"""
Asynchronous variant of `hit`.
This can be called from the serving thread safely.
"""
if self.pending is None:
LOG.error("Pacer is not yet started")
return
ev = asyncio.Event()
await self.pending.put(
(ev, action_name, keys)
)
await ev.wait()
def inject_config(self, config: dict):
"""
Inject settings for pacing
"""
self.pace_config.update(
{
key: parse(f"{value['max_concurrent']}/{int(value['refresh_period'])}seconds")
for key, value in config.items()
}
)
async def worker(pacer, action_name, *keys):
await pacer.ahit(action_name, *keys)
async def task(pacer, action_name, worker_count, group_count):
workers = [
worker(pacer, action_name, j)
for i in range(worker_count)
for j in range(group_count)
]
start = time.time()
await asyncio.gather(*workers)
end = time.time()
elapsed = end - start
for j in range(group_count):
print(f'Performed action ({action_name}, {j}) {worker_count} times in {elapsed} seconds')
async def main():
config = {
'test': {
'max_concurrent': 500,
'refresh_period': 1.0,
}
}
action_name = 'test'
workers = 10_000
groups = 10
print('Running semaphore tests')
async with asyncio.TaskGroup() as tg:
pacer = Pacer()
pacer.inject_config(config)
pacer_task = tg.create_task(pacer.serve())
await task(pacer, action_name, workers, groups)
pacer_task.cancel()
print('Running limits tests')
pacer = Pacer2(
FixedWindowRateLimiter(
MemoryStorage()
)
)
pacer.inject_config(config)
await task(pacer, action_name, workers, groups)
print('Running fair limits tests')
async with asyncio.TaskGroup() as tg:
pacer = Pacer3(
FixedWindowRateLimiter(
MemoryStorage()
)
)
pacer.inject_config(config)
pacer_task = tg.create_task(pacer.serve())
await task(pacer, action_name, workers, groups)
pacer_task.cancel()
print('Running semaphore no janus tests')
async with asyncio.TaskGroup() as tg:
pacer = Pacer4()
pacer.inject_config(config)
pacer_task = tg.create_task(pacer.serve())
await task(pacer, action_name, workers, groups)
pacer_task.cancel()
print('Running fair limits no janus tests')
async with asyncio.TaskGroup() as tg:
pacer = Pacer5(
FixedWindowRateLimiter(
MemoryStorage()
)
)
pacer.inject_config(config)
pacer_task = tg.create_task(pacer.serve())
await task(pacer, action_name, workers, groups)
pacer_task.cancel()
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment