Last active
June 16, 2017 14:39
-
-
Save manuelep/d0ea91ddf0d427eb475305546d87b27d to your computer and use it in GitHub Desktop.
Sorted queue management
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import bisect | |
from itertools import izip | |
class Squeue(object): | |
""" """ | |
def __init__(self, _iter, key=None): | |
super(Squeue, self).__init__() | |
if callable(_iter): | |
iterator = _iter | |
else: | |
def iterator(): | |
for i in _iter: | |
yield i | |
self.__keys = tuple(sorted(iterator(), key=key)) | |
if key is None: | |
self.key = lambda e: e | |
self.__values = self.__keys | |
else: | |
self.key = key | |
self.__values = map(self.key, self.__keys) | |
def __str__(self): | |
return self.__keys.__str__() | |
def __len__(self): | |
return len(self.__keys) | |
def guess_index(self, el, key=None, **kw): | |
""" Where a new element would be inserted? """ | |
value = key(el) if not key is None else self.key(el) | |
if key is None: | |
key = self.key | |
return bisect.bisect_left(self.__values, value, **kw) | |
# def insert(self, el, **kw): | |
# idx = self.guess_index(el, key=None, **kw) | |
# self.__keys.insert(idx, el) | |
# self.__values.insert(idx, self.key(el)) | |
def __getitem__(self, i): | |
return self.__keys[i] | |
def getvalue(self, i): | |
return self.__values[i] | |
def __iter__(self): | |
return self.__keys.__iter__() | |
def iteritems(): | |
""" A sorted iteritems """ | |
return izip(self.__keys, self.__values) | |
def itervalues(self): | |
for i in self.__values: | |
yield i | |
def first(self): | |
return self[0] | |
def last(self): | |
return self[-1] | |
def min(self): | |
return self.__values[0] | |
def max(self): | |
return self.__values[-1] | |
def _walk_away_from_index(self, idx, values=False): | |
n, walk = 0, True | |
while walk: | |
for m in (-1, 1,): | |
c = idx+(n*m) | |
# c>0 XOR c>len(self) | |
if (c < 0) != (c<len(self)): | |
if values is True: | |
yield self[c], self.__values[c] | |
else: | |
yield self[c] | |
elif c>=0 and c>len(self): | |
walk = False | |
else: | |
pass | |
if n == 0: | |
break | |
n += 1 | |
def walk_away_from(self, arg, values=False, **kw): | |
""" """ | |
idx = self.guess_index(arg, **kw) | |
return self._walk_away_from_index(idx, values=values) | |
class MultiSqueue(object): | |
"""docstring for MultiSqueue.""" | |
def __init__(self, seq, **keys): | |
super(MultiSqueue, self).__init__() | |
self.__queues = {k: Squeue(seq, key=f) for k,f in keys.iteritems()} | |
def __getitem__(self, k): | |
return self.__queues[k] | |
@property | |
def queues(self): | |
return self.__queues | |
def guess_indexes(self, el): | |
return {k: self.__queues[k].guess_index(el) for k in self.__queues} | |
def walk_away_from(self, el): | |
""" """ | |
ks, its = izip(*map(lambda ii: [ii[0], ii[1].walk_away_from(el)], self.__queues.iteritems())) | |
f = lambda it: it.next() | |
while True: | |
yield dict(zip(ks, [f(i) for i in its])) | |
def nearests(el): | |
""" """ | |
# TODO |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
import unittest | |
from squeue import Squeue, MultiSqueue | |
# 8, 9, 7 | |
test_values = ["baritono", "contralto", "soprano"] | |
class TestSimpleSqueue(unittest.TestCase): | |
""" Testing queues with no particular sorting function """ | |
new = "basso" | |
def setUp(self): | |
self.q = Squeue(test_values) | |
def test_len(self): | |
self.assertEqual(len(self.q), len(test_values)) | |
def test_index(self): | |
i = self.q.guess_index(self.new) | |
self.assertEqual(i, 1) | |
# def test_insert(self): | |
# i = self.q.guess_index(self.new) | |
# self.q.insert(self.new) | |
# self.assertEqual(self.q[i], self.new) | |
def test_getvalue(self): | |
res = self.q.getvalue(0) | |
self.assertEqual(res, "baritono") | |
def test_walk_away_from_index(self): | |
res = [i for i in self.q._walk_away_from_index(2)] | |
self.assertEqual(res, test_values[::-1]) | |
def test_walk_away_from(self): | |
res = [i for i in self.q.walk_away_from(self.new)] | |
self.assertEqual(res, ["contralto", "baritono", "soprano"]) | |
class TestSqueue(TestSimpleSqueue): | |
def setUp(self): | |
self.q = Squeue(test_values, key=len) | |
def test_index(self): | |
i = self.q.guess_index(self.new, key=len) | |
self.assertEqual(i, 0) | |
def test_getvalue(self): | |
res = self.q.getvalue(-1) | |
self.assertEqual(res, 9) | |
def test_walk_away_from_index(self): | |
res = [i for i in self.q._walk_away_from_index(1)] | |
self.assertEqual(res, ["baritono", "soprano", "contralto"]) | |
def test_walk_away_from(self): | |
res = [i for i in self.q.walk_away_from(self.new)] | |
self.assertEqual(res, ["soprano", "baritono", "contralto"]) | |
class TestMultiSqueue(unittest.TestCase): | |
new = "basso" | |
def setUp(self): | |
self.q = MultiSqueue([i for i in test_values], len=len, alpha=lambda e: e) | |
def test_guess_indexes(self): | |
res = self.q.guess_indexes(self.new) | |
self.assertEqual(res, {"len": 0, "alpha": 1}) | |
def test_walk_away_from(self): | |
# ["baritono", "contralto", "soprano"] | |
check = [{"len": "soprano", "alpha": "contralto"}, {"len": "baritono", "alpha": "baritono"}, {"len": "contralto", "alpha": "soprano"}] | |
self.q.walk_away_from(self.new) | |
res = [i for i in self.q.walk_away_from(self.new)] | |
res = self.assertEqual(res, check) | |
if __name__ == '__main__': | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment