-
-
Save Ktmi/a632abd0bb502e4787e702cb60cf658f to your computer and use it in GitHub Desktop.
Pacing tests for kytos
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import asyncio | |
from dataclasses import field, dataclass | |
import logging | |
import threading | |
import time | |
from janus import Queue | |
from kytos.core.pacing import Pacer | |
from limits import RateLimitItem, parse | |
from limits.aio.strategies import RateLimiter, FixedWindowRateLimiter | |
from limits.aio.storage import MemoryStorage | |
LOG = logging.getLogger(__name__) | |
@dataclass | |
class Pacer2: | |
strategy: RateLimiter | |
pace_config: dict[str, RateLimitItem] = field(default_factory=dict) | |
def inject_config(self, config: dict): | |
""" | |
Inject settings for pacing | |
""" | |
self.pace_config.update( | |
{ | |
key: parse(f"{value['max_concurrent']}/{int(value['refresh_period'])}seconds") | |
for key, value in config.items() | |
} | |
) | |
async def ahit(self, action_name, *keys): | |
identifiers = self.pace_config[action_name], action_name, *keys | |
while not await self.strategy.hit(*identifiers): | |
window_reset, _ = await self.strategy.get_window_stats( | |
*identifiers | |
) | |
sleep_time = window_reset - time.time() | |
await asyncio.sleep(sleep_time) | |
@dataclass | |
class Pacer3: | |
"""Class for controlling the rate at which actions are executed.""" | |
strategy: RateLimiter | |
pace_config: dict[str, RateLimitItem] = field(default_factory=dict) | |
pending: Queue[tuple[asyncio.Event, str, tuple]] = field(default=None) | |
scheduling: dict[tuple, asyncio.Queue] =\ | |
field(default_factory=dict) | |
async def serve(self): | |
""" | |
Serve pacing requests. | |
""" | |
LOG.info("Starting pacer.") | |
if self.pending is not None: | |
LOG.error("Tried to start pacer, already started.") | |
self.pending = Queue() | |
queue = self.pending.async_q | |
try: | |
async with asyncio.TaskGroup() as tg: | |
while True: | |
event, action_name, keys = await queue.get() | |
if action_name not in self.pace_config: | |
LOG.warning("Pace for `%s` is not set", action_name) | |
event.set() | |
continue | |
keys = action_name, *keys | |
if keys not in self.scheduling: | |
self.scheduling[keys] = asyncio.Queue() | |
sub_queue = self.scheduling[keys] | |
sub_queue.put_nowait(event) | |
tg.create_task(self.__process_one(*keys)) | |
except Exception as ex: | |
LOG.error("Pacing encounter error %s", ex) | |
raise ex | |
finally: | |
LOG.info("Shutting down pacer.") | |
self.pending = None | |
async def __process_one( | |
self, | |
action_name: str, | |
*keys | |
): | |
"""Ensure's fairness in dispatch.""" | |
keys = action_name, *keys | |
queue = self.scheduling[keys] | |
limit = self.pace_config[action_name] | |
identifiers = limit, *keys | |
while not await self.strategy.hit(*identifiers): | |
window_reset, _ = await self.strategy.get_window_stats( | |
*identifiers | |
) | |
sleep_time = window_reset - time.time() | |
await asyncio.sleep(sleep_time) | |
event: asyncio.Event = queue.get_nowait() | |
event.set() | |
async def ahit( | |
self, | |
action_name: str, | |
*keys | |
): | |
""" | |
Asynchronous variant of `hit`. | |
This can be called from the serving thread safely. | |
""" | |
if self.pending is None: | |
LOG.error("Pacer is not yet started") | |
return | |
ev = asyncio.Event() | |
await self.pending.async_q.put( | |
(ev, action_name, keys) | |
) | |
await ev.wait() | |
def hit(self, action_name, *keys): | |
""" | |
Pace execution, based on the pacing config for the given `action_name`. | |
Keys can be included to allow multiple objects | |
to be be paced separately on the same action. | |
This should not be called from the same thread serving | |
the pacing. | |
""" | |
if self.pending is None: | |
LOG.error("Pacer is not yet started") | |
return | |
ev = threading.Event() | |
self.pending.sync_q.put( | |
(ev, action_name, keys) | |
) | |
ev.wait() | |
def inject_config(self, config: dict): | |
""" | |
Inject settings for pacing | |
""" | |
self.pace_config.update( | |
{ | |
key: parse(f"{value['max_concurrent']}/{int(value['refresh_period'])}seconds") | |
for key, value in config.items() | |
} | |
) | |
@dataclass | |
class Pacer4: | |
"""Class for controlling the rate at which actions are executed.""" | |
pace_config: dict[str, tuple[int, float]] = field(default_factory=dict) | |
pending: asyncio.Queue[tuple[asyncio.Event, str, tuple]] = field(default=None) | |
scheduling: dict[tuple, tuple[asyncio.Semaphore, asyncio.Queue]] =\ | |
field(default_factory=dict) | |
async def serve(self): | |
""" | |
Serve pacing requests. | |
""" | |
LOG.info("Starting pacer.") | |
if self.pending is not None: | |
LOG.error("Tried to start pacer, already started.") | |
self.pending = asyncio.Queue() | |
queue = self.pending#.async_q | |
try: | |
async with asyncio.TaskGroup() as tg: | |
while True: | |
event, action_name, keys = await queue.get() | |
if action_name not in self.pace_config: | |
LOG.warning("Pace for `%s` is not set", action_name) | |
event.set() | |
continue | |
keys = action_name, *keys | |
if keys not in self.scheduling: | |
max_concurrent, _ = self.pace_config[action_name] | |
self.scheduling[keys] = ( | |
asyncio.Semaphore(max_concurrent), | |
asyncio.Queue() | |
) | |
_, sub_queue = self.scheduling[keys] | |
sub_queue.put_nowait(event) | |
tg.create_task(self.__process_one(*keys)) | |
except Exception as ex: | |
LOG.error("Pacing encounter error %s", ex) | |
raise ex | |
finally: | |
LOG.info("Shutting down pacer.") | |
self.pending = None | |
async def __process_one( | |
self, | |
action_name: str, | |
*keys | |
): | |
"""Ensure's fairness in dispatch.""" | |
keys = action_name, *keys | |
semaphore, queue = self.scheduling[keys] | |
_, refresh_period = self.pace_config[action_name] | |
if semaphore.locked(): | |
LOG.warning("Pace limit reached on %s", keys) | |
async with semaphore: | |
event: asyncio.Event = queue.get_nowait() | |
event.set() | |
await asyncio.sleep(refresh_period) | |
async def ahit( | |
self, | |
action_name: str, | |
*keys | |
): | |
""" | |
Asynchronous variant of `hit`. | |
This can be called from the serving thread safely. | |
""" | |
if self.pending is None: | |
LOG.error("Pacer is not yet started") | |
return | |
ev = asyncio.Event() | |
await self.pending.put( | |
(ev, action_name, keys) | |
) | |
await ev.wait() | |
def inject_config(self, config: dict): | |
""" | |
Inject settings for pacing | |
""" | |
self.pace_config.update( | |
{ | |
key: (value['max_concurrent'], value['refresh_period']) | |
for key, value in config.items() | |
} | |
) | |
@dataclass | |
class Pacer5: | |
"""Class for controlling the rate at which actions are executed.""" | |
strategy: RateLimiter | |
pace_config: dict[str, RateLimitItem] = field(default_factory=dict) | |
pending: asyncio.Queue[tuple[asyncio.Event, str, tuple]] = field(default=None) | |
scheduling: dict[tuple, asyncio.Queue] =\ | |
field(default_factory=dict) | |
async def serve(self): | |
""" | |
Serve pacing requests. | |
""" | |
LOG.info("Starting pacer.") | |
if self.pending is not None: | |
LOG.error("Tried to start pacer, already started.") | |
self.pending = asyncio.Queue() | |
queue = self.pending | |
try: | |
async with asyncio.TaskGroup() as tg: | |
while True: | |
event, action_name, keys = await queue.get() | |
if action_name not in self.pace_config: | |
LOG.warning("Pace for `%s` is not set", action_name) | |
event.set() | |
continue | |
keys = action_name, *keys | |
if keys not in self.scheduling: | |
self.scheduling[keys] = asyncio.Queue() | |
sub_queue = self.scheduling[keys] | |
sub_queue.put_nowait(event) | |
tg.create_task(self.__process_one(*keys)) | |
except Exception as ex: | |
LOG.error("Pacing encounter error %s", ex) | |
raise ex | |
finally: | |
LOG.info("Shutting down pacer.") | |
self.pending = None | |
async def __process_one( | |
self, | |
action_name: str, | |
*keys | |
): | |
"""Ensure's fairness in dispatch.""" | |
keys = action_name, *keys | |
queue = self.scheduling[keys] | |
limit = self.pace_config[action_name] | |
identifiers = limit, *keys | |
while not await self.strategy.hit(*identifiers): | |
window_reset, _ = await self.strategy.get_window_stats( | |
*identifiers | |
) | |
sleep_time = window_reset - time.time() | |
await asyncio.sleep(sleep_time) | |
event: asyncio.Event = queue.get_nowait() | |
event.set() | |
async def ahit( | |
self, | |
action_name: str, | |
*keys | |
): | |
""" | |
Asynchronous variant of `hit`. | |
This can be called from the serving thread safely. | |
""" | |
if self.pending is None: | |
LOG.error("Pacer is not yet started") | |
return | |
ev = asyncio.Event() | |
await self.pending.put( | |
(ev, action_name, keys) | |
) | |
await ev.wait() | |
def inject_config(self, config: dict): | |
""" | |
Inject settings for pacing | |
""" | |
self.pace_config.update( | |
{ | |
key: parse(f"{value['max_concurrent']}/{int(value['refresh_period'])}seconds") | |
for key, value in config.items() | |
} | |
) | |
async def worker(pacer, action_name, *keys): | |
await pacer.ahit(action_name, *keys) | |
async def task(pacer, action_name, worker_count, group_count): | |
workers = [ | |
worker(pacer, action_name, j) | |
for i in range(worker_count) | |
for j in range(group_count) | |
] | |
start = time.time() | |
await asyncio.gather(*workers) | |
end = time.time() | |
elapsed = end - start | |
for j in range(group_count): | |
print(f'Performed action ({action_name}, {j}) {worker_count} times in {elapsed} seconds') | |
async def main(): | |
config = { | |
'test': { | |
'max_concurrent': 500, | |
'refresh_period': 1.0, | |
} | |
} | |
action_name = 'test' | |
workers = 10_000 | |
groups = 10 | |
print('Running semaphore tests') | |
async with asyncio.TaskGroup() as tg: | |
pacer = Pacer() | |
pacer.inject_config(config) | |
pacer_task = tg.create_task(pacer.serve()) | |
await task(pacer, action_name, workers, groups) | |
pacer_task.cancel() | |
print('Running limits tests') | |
pacer = Pacer2( | |
FixedWindowRateLimiter( | |
MemoryStorage() | |
) | |
) | |
pacer.inject_config(config) | |
await task(pacer, action_name, workers, groups) | |
print('Running fair limits tests') | |
async with asyncio.TaskGroup() as tg: | |
pacer = Pacer3( | |
FixedWindowRateLimiter( | |
MemoryStorage() | |
) | |
) | |
pacer.inject_config(config) | |
pacer_task = tg.create_task(pacer.serve()) | |
await task(pacer, action_name, workers, groups) | |
pacer_task.cancel() | |
print('Running semaphore no janus tests') | |
async with asyncio.TaskGroup() as tg: | |
pacer = Pacer4() | |
pacer.inject_config(config) | |
pacer_task = tg.create_task(pacer.serve()) | |
await task(pacer, action_name, workers, groups) | |
pacer_task.cancel() | |
print('Running fair limits no janus tests') | |
async with asyncio.TaskGroup() as tg: | |
pacer = Pacer5( | |
FixedWindowRateLimiter( | |
MemoryStorage() | |
) | |
) | |
pacer.inject_config(config) | |
pacer_task = tg.create_task(pacer.serve()) | |
await task(pacer, action_name, workers, groups) | |
pacer_task.cancel() | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment