Skip to content

Instantly share code, notes, and snippets.

@timkpaine
Last active May 7, 2019 20:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save timkpaine/22adda3b069c5bdfe84ed6f3ca7543e5 to your computer and use it in GitHub Desktop.
Save timkpaine/22adda3b069c5bdfe84ed6f3ca7543e5 to your computer and use it in GitHub Desktop.
Cycle-timing for rxpy
import time
def o1(x):
print("1: %s" % x)
return x
def o2(x):
print("2: %s" % x)
return x
def o3(x):
print("3: %s" % x)
return x
def sleep(observer):
count = 1
while(True):
time.sleep(1)
observer.on_next(count)
count += 1
from rx import Observable, AnonymousObservable
from rx.internal.utils import adapt_call
from rx.internal import extensionmethod
@extensionmethod(Observable, alias="cycle")
def cycle(self, predicate):
predicate = adapt_call(predicate)
parent = self
def subscribe(observer):
next = []
def on_next(value):
nonlocal next
predicate(value)
for n in next:
observer.on_next(n)
next = []
next.append(value)
return parent.subscribe(on_next)
return AnonymousObservable(subscribe)
emissions = Observable.create(sleep)
emissions.cycle(o1).cycle(o2).subscribe(o3)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment