Skip to content

Instantly share code, notes, and snippets.

@jhohertz
Created August 19, 2014 19:10
Show Gist options
  • Save jhohertz/6e2976694f7f3b4e58b6 to your computer and use it in GitHub Desktop.
Save jhohertz/6e2976694f7f3b4e58b6 to your computer and use it in GitHub Desktop.
pyrx create observable example
#!/usr/bin/python
from __future__ import print_function
from rx import Observable
class RxException(Exception):
pass
def _raise(ex):
raise RxException(ex)
def blockstream(devnode):
def subscribe(sub):
def dispose():
is_stopped = True
is_stopped = False
try:
for i in range(10,100):
if is_stopped:
return
sub.on_next(i)
if not is_stopped:
sub.on_completed()
except Exception as e:
if not is_stopped:
sub.on_error(e.Message)
return lambda: None
return Observable.create(subscribe)
#res = Observable.throw_exception("ERRORED!")
res = blockstream("foo").take(5)
subscription = res.subscribe(
lambda x: print("Observer 1: OnNext: ", x),
lambda ex: _raise(ex),
lambda : print("Observer 1: OnCompleted")
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment