Skip to content

Instantly share code, notes, and snippets.

@Julian Julian/divide.py
Created Mar 24, 2013

Embed
What would you like to do?
Iterator Partitioning
import collections
class Divide(object):
def __init__(self, iterable, key=None):
self._buffer = collections.defaultdict(collections.deque)
self._key = key
self._iterable = iter(iterable)
def __getitem__(self, item):
keyed_item = self._key(item) if self._key is not None else item
while True:
buffered = self._buffer[keyed_item]
if buffered:
yield buffered.popleft()
else:
for i in self._iterable:
keyed_i = self._key(i) if self._key is not None else i
if keyed_i == keyed_item:
yield i
else:
self._buffer[keyed_i].append(i)
else:
raise StopIteration
from unittest import TestCase
import random
from divide import Divide
class TestDivide(TestCase):
def test_only_ones(self):
d = Divide([1, 1, 1])
ones, twos = d[1], d[2]
self.assertEqual(list(ones), [1, 1, 1])
self.assertEqual(list(twos), [])
def test_interleaved(self):
d = Divide([1, 2, 1, 1, 2])
ones, twos, threes = d[1], d[2], d[3]
self.assertEqual(list(ones), [1, 1, 1])
self.assertEqual(list(twos), [2, 2])
self.assertEqual(list(threes), [])
def test_key(self):
d = Divide([1, "2", 1, "1", 2], key=str)
ones, twos, threes = d[1], d[2], d[3]
self.assertEqual(list(ones), [1, 1, "1"])
self.assertEqual(list(twos), ["2", 2])
self.assertEqual(list(threes), [])
def test_see_once(self):
d = Divide([2, 1, 1, 1, 1])
ones, twos, threes = d[1], d[2], d[3]
self.assertEqual(list(ones), [1, 1, 1, 1])
self.assertEqual(list(twos), [2])
self.assertEqual(list(threes), [])
def test_random(self):
elements = [random.randint(1, 10) for _ in xrange(1000)]
d = Divide(elements)
for i in xrange(10):
self.assertEqual(list(d[i]), [i] * elements.count(i))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.