Skip to content

Instantly share code, notes, and snippets.

@flashlin
Last active June 16, 2021 00:05
Show Gist options
  • Save flashlin/dd97180c88f189231a6211c46d138237 to your computer and use it in GitHub Desktop.
Save flashlin/dd97180c88f189231a6211c46d138237 to your computer and use it in GitHub Desktop.
[PyRx Example1] #Python
import asyncio
from rx import Observer, Observable
from rx.core import Scheduler
def observable_fn(observer):
_task = None
# Customize this
def _setup():
print('Observable setup')
# Customize this
def _teardown():
print('Observable teardown')
if _task:
_task.cancel()
observer.on_completed()
# Customize this
async def _loop():
counter = 0
while True:
await asyncio.sleep(2) # Simulating long-running task
counter += 1
observer.on_next(counter)
# Don't touch this
async def _run_loop():
try:
await _loop()
except asyncio.CancelledError:
print('Observable cancelled')
finally:
_teardown()
# Don't touch this
_setup()
loop = asyncio.get_event_loop()
_task = loop.create_task(_run_loop())
print('CREATING OBSERVABLE')
observable = Observable.create(observable_fn).observe_on(Scheduler.event_loop).share()
print('CREATING OBSERVERS')
observer1 = observable.subscribe(lambda x: print('O1: {0}'.format(x)))
observer2 = observable.subscribe(lambda x: print('O2: {0}'.format(x)))
observer3 = observable.subscribe(lambda x: print('O3: {0}'.format(x)))
loop = asyncio.get_event_loop()
try:
print('STARTING LOOP')
loop.run_forever()
except KeyboardInterrupt:
pending_tasks = asyncio.Task.all_tasks()
for task in pending_tasks:
task.cancel()
loop.run_until_complete(asyncio.gather(*pending_tasks))
loop.close()
print('THE END')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment