Last active
August 6, 2019 14:07
-
-
Save earonesty/cafa4626a2def6766acf5098331157b3 to your computer and use it in GitHub Desktop.
Python Generator Multiplexter
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import queue | |
from threading import Lock | |
from collections import namedtuple | |
class Muxer(): | |
Entry = namedtuple('Entry', 'genref listeners, lock') | |
already = {} | |
top_lock = Lock() | |
def __init__(self, func, restart=False): | |
self.restart = restart | |
self.func = func | |
self.queue = queue.Queue() | |
with self.top_lock: | |
if func not in self.already: | |
self.already[func] = self.Entry([func()], [], Lock()) | |
ent = self.already[func] | |
self.genref = ent.genref | |
self.lock = ent.lock | |
self.listeners = ent.listeners | |
self.listeners.append(self) | |
def __iter__(self): | |
return self | |
def __next__(self): | |
try: | |
e = self.queue.get_nowait() | |
except queue.Empty: | |
with self.lock: | |
try: | |
e = self.queue.get_nowait() | |
except queue.Empty: | |
try: | |
e = next(self.genref[0]) | |
for other in self.listeners: | |
if not other is self: | |
other.queue.put(e) | |
except StopIteration: | |
if self.restart: | |
self.genref[0] = self.func() | |
raise | |
return e | |
def __del__(self): | |
with self.top_lock: | |
try: | |
self.listeners.remove(self) | |
except ValueError: | |
pass | |
if not self.listeners and self.func in self.already: | |
del self.already[self.func] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
from muxer import Muxer | |
def test_simple_mux(): | |
def gen(): | |
yield from range(4) | |
m1 = Muxer(gen) | |
m2 = Muxer(gen) | |
assert len(list(m1)) == 4 | |
assert len(list(m2)) == 4 | |
def test_thready_mux(): | |
threads = 10 | |
count = 1000 | |
def gen(): | |
yield from range(count) | |
def counter(m): | |
def inner(): | |
inner.count = 0 | |
for _ in m: | |
inner.count += 1 | |
return inner | |
m = [None] * threads | |
c = [None] * threads | |
t = [None] * threads | |
for i in range(threads): | |
m[i] = Muxer(gen) | |
c[i] = counter(m[i]) | |
t[i] = threading.Thread(target=c[i]) | |
for i in range(threads): | |
t[i].start() | |
for i in range(threads): | |
t[i].join() | |
assert c[i].count == count | |
def test_later_mux(): | |
def gen(): | |
yield from range(4) | |
m1 = Muxer(gen) | |
assert next(m1) == 0 | |
m2 = Muxer(gen) | |
assert len(list(m1)) == 3 | |
assert len(list(m2)) == 3 | |
def test_restart_mux(): | |
def gen(): | |
yield from range(4) | |
m1 = Muxer(gen, restart=True) | |
m2 = Muxer(gen, restart=True) | |
assert len(list(m1)) == 4 | |
assert len(list(m2)) == 8 | |
assert len(list(m1)) == 8 | |
assert len(list(m2)) == 8 | |
def test_del(): | |
def gen(): | |
yield from range(4) | |
m1 = Muxer(gen) | |
i = next(m1) | |
m2 = Muxer(gen) | |
lm2 = list(m2) | |
assert len(m2.listeners) == 2 | |
assert len(lm2) == 3 | |
m2.__del__() | |
assert len(m1.listeners) == 1 | |
j = list(m1) | |
assert gen in Muxer.already | |
m1.__del__() | |
assert len(m1.listeners) == 0 | |
assert gen not in Muxer.already |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment