Skip to content

Instantly share code, notes, and snippets.

@lucaspg96
Last active October 9, 2020 20:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lucaspg96/7948a50f1997e995e9a1520833fc9bef to your computer and use it in GitHub Desktop.
Save lucaspg96/7948a50f1997e995e9a1520833fc9bef to your computer and use it in GitHub Desktop.
import rx
from rx import operators as ops
from collections.abc import Iterable
import logging
identity = lambda x: x
def throw(e):
print(e)
logging.exception(e)
raise e
class Stream():
"""
Classe principal contendo as funções para modelagem dos pipelines de stream.
Todas as operações são imutáveis, ou seja, retornam um novo pipeline.
"""
def __init__(self, stream):
"""
Construtor do objeto Stream.
Essa função é o ponto de partida para construir seus pipelines.
Ela recebe qualquer entidade que seja iterável (listas, dicionários, generators, etc.) e usa como fonte de dados da stream.
"""
if isinstance(stream, Iterable):
self.content = rx.from_iterable(stream)
else:
self.content = stream
def run_for_each(self, function):
"""
Executa, para cada elemento da stream, uma função.
"""
self.content.subscribe(function, on_error = throw)
def map(self, function):
"""
Executa, para cada elemento A da stream, uma função f que gera um elemento B.
Retorna uma stream de elementos do tipo de B.
"""
return Stream(self.content.pipe(ops.map(function)))
def flat_map(self, function):
"""
Executa, para cada elemento A da stream, uma função f que gera uma sequência de elementos [B1, B2, B3, ...].
Retorna uma stream de elementos do tipo de B.
"""
return Stream(self.content.pipe(ops.flat_map(function)))
def flatten(self):
"""
Assumindo que a stream seja de sequências de elementos [A1, A2, A3, ...],
retorna uma stream de elementos do tipo A.
"""
return Stream(self.content.pipe(ops.flat_map(identity)))
def filter(self, function):
"""
Executa, para cada elemento A da stream, uma função f (chamada de predicado) que retorna True ou False.
Retorna uma stream de elementos do tipo de A onde f(A) seja True.
"""
return Stream(self.content.pipe(ops.filter(function)))
def filter_not(self, function):
"""
Executa, para cada elemento A da stream, uma função f (chamada de predicado) que retorna True ou False.
Retorna uma stream de elementos do tipo de A onde f(A) seja False.
"""
return Stream(self.content.pipe(ops.filter(lambda x: not function(x))))
def print(self):
"""
Realiza o print de cada elemento da stream
"""
self.run_for_each(print)
def concat(self, stream):
"""
Dada uma outra stream (ou iterável) X, retorna uma nova stream que,
ao terminar de enviar os dados da stream original, concatena com os elementos de X.
"""
if isinstance(stream, Iterable):
return Stream(self.content.pipe(ops.concat(rx.from_iterable(stream))))
return Stream(self.content.pipe(ops.concat(stream.content)))
def count_window(self, size):
"""
Retorna uma stream de sequências [A1, A2, A3, ..., A_size] de tamanho fixo.
Uma sequência só será emitida quando forem consumidos 'size' elementos OU a stream acabe.
"""
return Stream(self.content.pipe(
ops.window_with_count(size),
ops.map(lambda s: s.pipe(ops.to_list())),
ops.merge_all()
)
)
def time_window(self, window_size, window_shift):
"""
Retorna uma stream de sequências [A1, A2, A3, ...] de tamanho variável.
Inicialmente, espera-se 'window_size' (informado usando o datetime.timedelta) de tempo coletando dados e envia uma sequência com todos os dados consumidos nesse tempo.
Em seguida, espera-se mais 'window_shift' (informado usando o datetime.timedelta) e envia os dados consumidos em 'window_size'.
Note que:
- se window_size > window_shift, haverá dados em comum entre duas sequências enviadas
- se window_size < window_shift, haverá dados que não serão enviados
"""
return Stream(self.content.pipe(
ops.window_with_time(window_size, window_shift),
ops.map(lambda s: s.pipe(ops.to_list())),
ops.merge_all()
)
)
@venkibale
Copy link

Hi,
Is there any way to send msgs to bot from bot ?
Thanks,
Venkatesh

@lucaspg96
Copy link
Author

Hi @venkibale, I think there is some way, but I also think that you commented at the wrong gist xD

I never tried to do so, but I always wondered if it was possible. With my humble experience with Telegram API, it seems that it supports such interaction.

@venkibale
Copy link

venkibale commented Sep 5, 2020 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment