Skip to content

Instantly share code, notes, and snippets.

@mgd020
Created April 23, 2019 10:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mgd020/fee59d920a6bca1e5a31512e1cd55dd6 to your computer and use it in GitHub Desktop.
Save mgd020/fee59d920a6bca1e5a31512e1cd55dd6 to your computer and use it in GitHub Desktop.
Fair Queueing implementation for Python multiprocessing
"""A multiprocessing Queue but is "fair" to every flow/session/channel.
See https://en.wikipedia.org/wiki/Fair_queuing.
"""
import sys
import time
from multiprocessing.queues import Queue
if sys.version_info[0] == 3:
from queue import PriorityQueue, Empty
else:
from Queue import PriorityQueue, Empty
class PrioritizedItem(object):
"""Adds ordering to item, without including item in the ordering."""
__slots__ = ["priority", "item"]
def __init__(self, priority, item):
self.priority = priority
self.item = item
def __lt__(self, other):
return self.priority < other.priority
def __repr__(self):
return "<PrioritizedItem priority=%r item=%r>" % (self.priority, self.item)
class FairQueueBuffer(object):
"""Implement deque interface used by Queue, but each item must have a flow.
Items must be of the form (flow, item) where:
* flow has __hash__
* item is picklable
"""
_default_weight = 1.0
def __init__(self):
self._flow_weight = {}
self.clear()
def clear(self):
self._priority_queue = PriorityQueue()
self._flow_finish = {}
def set_flow_weight(self, flow, weight):
if weight == self._default_weight:
self._flow_weight.pop(flow, None)
else:
self._flow_weight[flow] = float(weight)
def get_flow_weight(self, flow):
return self._flow_weight.get(flow, self._default_weight)
def append(self, obj):
# extract flow
try:
flow, obj = obj
except Exception:
flow, obj = obj, obj
# calculate finish time
start = max(self._flow_finish.get(flow, 0), time.time())
self._flow_finish[flow] = finish = start + (1.0 / self.get_flow_weight(flow))
# finally put item!
self._priority_queue.put(PrioritizedItem(finish, obj))
def popleft(self):
try:
return self._priority_queue.get(block=False).item
except Empty:
raise IndexError() # deque().popleft()
class FairQueue2(Queue):
def _after_fork(self):
super(FairQueue2, self)._after_fork()
self._buffer = FairQueueBuffer()
def set_flow_weight(self, flow, weight):
self._buffer.set_flow_weight(flow, weight)
def get_flow_weight(self, flow):
self._buffer.get_flow_weight(flow)
if sys.version_info[0] == 3:
from multiprocessing import get_context
class FairQueue3(FairQueue2):
def __init__(self, *args, **kwargs):
super(FairQueue3, self).__init__(*args, ctx=get_context(), **kwargs)
FairQueue = FairQueue3
else:
FairQueue = FairQueue2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment