Skip to content

Instantly share code, notes, and snippets.

@radium226
Created September 7, 2016 15:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save radium226/b0f791d79f561f3399a154bd805665b1 to your computer and use it in GitHub Desktop.
Save radium226/b0f791d79f561f3399a154bd805665b1 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
from rx import Observable, Observer, AnonymousObservable
from time import sleep
from rx.concurrency import NewThreadScheduler
from concurrent.futures import ThreadPoolExecutor
from rx.internal import extensionmethod
@extensionmethod(Observable)
def repeat_letter(self, letter):
source = self
def subscribe(observer):
def on_next(count):
observer.on_next(letter * count)
def on_completed():
observer.on_completed()
return source.subscribe(on_next, observer.on_error, on_completed)
return AnonymousObservable(subscribe)
import random
def shuffle_range(min, max):
sl = list(range(min, max))
random.shuffle(sl)
return sl
if __name__ == "__main__":
def heavy_stuff(i):
print("Begining huge work for %i" % i)
sleep(i)
print("Finishing huge work for %i" % i)
return i
with ThreadPoolExecutor(5) as executor:
integers = Observable.from_iterable(iter(shuffle_range(1, 10))).flat_map(lambda i: executor.submit(heavy_stuff, i)).repeat_letter('a')
for integer in integers.to_blocking():
print(integer)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment