Skip to content

Instantly share code, notes, and snippets.

@earonesty
Last active August 6, 2019 14:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save earonesty/cafa4626a2def6766acf5098331157b3 to your computer and use it in GitHub Desktop.
Save earonesty/cafa4626a2def6766acf5098331157b3 to your computer and use it in GitHub Desktop.
Python Generator Multiplexter
import queue
from threading import Lock
from collections import namedtuple
class Muxer():
Entry = namedtuple('Entry', 'genref listeners, lock')
already = {}
top_lock = Lock()
def __init__(self, func, restart=False):
self.restart = restart
self.func = func
self.queue = queue.Queue()
with self.top_lock:
if func not in self.already:
self.already[func] = self.Entry([func()], [], Lock())
ent = self.already[func]
self.genref = ent.genref
self.lock = ent.lock
self.listeners = ent.listeners
self.listeners.append(self)
def __iter__(self):
return self
def __next__(self):
try:
e = self.queue.get_nowait()
except queue.Empty:
with self.lock:
try:
e = self.queue.get_nowait()
except queue.Empty:
try:
e = next(self.genref[0])
for other in self.listeners:
if not other is self:
other.queue.put(e)
except StopIteration:
if self.restart:
self.genref[0] = self.func()
raise
return e
def __del__(self):
with self.top_lock:
try:
self.listeners.remove(self)
except ValueError:
pass
if not self.listeners and self.func in self.already:
del self.already[self.func]
import threading
from muxer import Muxer
def test_simple_mux():
def gen():
yield from range(4)
m1 = Muxer(gen)
m2 = Muxer(gen)
assert len(list(m1)) == 4
assert len(list(m2)) == 4
def test_thready_mux():
threads = 10
count = 1000
def gen():
yield from range(count)
def counter(m):
def inner():
inner.count = 0
for _ in m:
inner.count += 1
return inner
m = [None] * threads
c = [None] * threads
t = [None] * threads
for i in range(threads):
m[i] = Muxer(gen)
c[i] = counter(m[i])
t[i] = threading.Thread(target=c[i])
for i in range(threads):
t[i].start()
for i in range(threads):
t[i].join()
assert c[i].count == count
def test_later_mux():
def gen():
yield from range(4)
m1 = Muxer(gen)
assert next(m1) == 0
m2 = Muxer(gen)
assert len(list(m1)) == 3
assert len(list(m2)) == 3
def test_restart_mux():
def gen():
yield from range(4)
m1 = Muxer(gen, restart=True)
m2 = Muxer(gen, restart=True)
assert len(list(m1)) == 4
assert len(list(m2)) == 8
assert len(list(m1)) == 8
assert len(list(m2)) == 8
def test_del():
def gen():
yield from range(4)
m1 = Muxer(gen)
i = next(m1)
m2 = Muxer(gen)
lm2 = list(m2)
assert len(m2.listeners) == 2
assert len(lm2) == 3
m2.__del__()
assert len(m1.listeners) == 1
j = list(m1)
assert gen in Muxer.already
m1.__del__()
assert len(m1.listeners) == 0
assert gen not in Muxer.already
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment