Created
April 23, 2019 10:56
-
-
Save mgd020/fee59d920a6bca1e5a31512e1cd55dd6 to your computer and use it in GitHub Desktop.
Fair Queueing implementation for Python multiprocessing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""A multiprocessing Queue but is "fair" to every flow/session/channel. | |
See https://en.wikipedia.org/wiki/Fair_queuing. | |
""" | |
import sys | |
import time | |
from multiprocessing.queues import Queue | |
if sys.version_info[0] == 3: | |
from queue import PriorityQueue, Empty | |
else: | |
from Queue import PriorityQueue, Empty | |
class PrioritizedItem(object): | |
"""Adds ordering to item, without including item in the ordering.""" | |
__slots__ = ["priority", "item"] | |
def __init__(self, priority, item): | |
self.priority = priority | |
self.item = item | |
def __lt__(self, other): | |
return self.priority < other.priority | |
def __repr__(self): | |
return "<PrioritizedItem priority=%r item=%r>" % (self.priority, self.item) | |
class FairQueueBuffer(object): | |
"""Implement deque interface used by Queue, but each item must have a flow. | |
Items must be of the form (flow, item) where: | |
* flow has __hash__ | |
* item is picklable | |
""" | |
_default_weight = 1.0 | |
def __init__(self): | |
self._flow_weight = {} | |
self.clear() | |
def clear(self): | |
self._priority_queue = PriorityQueue() | |
self._flow_finish = {} | |
def set_flow_weight(self, flow, weight): | |
if weight == self._default_weight: | |
self._flow_weight.pop(flow, None) | |
else: | |
self._flow_weight[flow] = float(weight) | |
def get_flow_weight(self, flow): | |
return self._flow_weight.get(flow, self._default_weight) | |
def append(self, obj): | |
# extract flow | |
try: | |
flow, obj = obj | |
except Exception: | |
flow, obj = obj, obj | |
# calculate finish time | |
start = max(self._flow_finish.get(flow, 0), time.time()) | |
self._flow_finish[flow] = finish = start + (1.0 / self.get_flow_weight(flow)) | |
# finally put item! | |
self._priority_queue.put(PrioritizedItem(finish, obj)) | |
def popleft(self): | |
try: | |
return self._priority_queue.get(block=False).item | |
except Empty: | |
raise IndexError() # deque().popleft() | |
class FairQueue2(Queue): | |
def _after_fork(self): | |
super(FairQueue2, self)._after_fork() | |
self._buffer = FairQueueBuffer() | |
def set_flow_weight(self, flow, weight): | |
self._buffer.set_flow_weight(flow, weight) | |
def get_flow_weight(self, flow): | |
self._buffer.get_flow_weight(flow) | |
if sys.version_info[0] == 3: | |
from multiprocessing import get_context | |
class FairQueue3(FairQueue2): | |
def __init__(self, *args, **kwargs): | |
super(FairQueue3, self).__init__(*args, ctx=get_context(), **kwargs) | |
FairQueue = FairQueue3 | |
else: | |
FairQueue = FairQueue2 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment