Skip to content

Instantly share code, notes, and snippets.

@affo
Created June 15, 2016 14:17
Show Gist options
  • Save affo/04c3a18d8a5e81d3599203c9ac635994 to your computer and use it in GitHub Desktop.
Save affo/04c3a18d8a5e81d3599203c9ac635994 to your computer and use it in GitHub Desktop.
Operations on GroupedObservables with RxPY
import random
import time
from rx.subjects import Subject
#### The System
# The entrypoint
s = Subject()
def produce(word):
s.on_next(word)
# The common `group_by` observable that
# pre-exists the clients' queries.
# It groups by "word".
grouped = s.group_by(lambda x: x)
# The client's query.
# Every client can count only its key.
def query(key):
promise = Subject()
def partition_f(underlying_obs):
# the query on the partition
return underlying_obs.scan(lambda acc, _: acc + 1, 0)
grouped \
.filter(lambda go: go.key == key) \
.map(lambda go: go.underlying_observable) \
.map(partition_f) \
.subscribe(lambda o: o.subscribe(promise.on_next))
return promise
#### The Client
if __name__ == '__main__':
query('foo').subscribe(print)
words = [
random.choice(['foo', 'bar', 'buz'])
for _ in range(20)
]
for w in words:
time.sleep(0.5)
produce(w)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment