Last active
October 9, 2020 20:28
-
-
Save lucaspg96/7948a50f1997e995e9a1520833fc9bef to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import rx | |
from rx import operators as ops | |
from collections.abc import Iterable | |
import logging | |
identity = lambda x: x | |
def throw(e): | |
print(e) | |
logging.exception(e) | |
raise e | |
class Stream(): | |
""" | |
Classe principal contendo as funções para modelagem dos pipelines de stream. | |
Todas as operações são imutáveis, ou seja, retornam um novo pipeline. | |
""" | |
def __init__(self, stream): | |
""" | |
Construtor do objeto Stream. | |
Essa função é o ponto de partida para construir seus pipelines. | |
Ela recebe qualquer entidade que seja iterável (listas, dicionários, generators, etc.) e usa como fonte de dados da stream. | |
""" | |
if isinstance(stream, Iterable): | |
self.content = rx.from_iterable(stream) | |
else: | |
self.content = stream | |
def run_for_each(self, function): | |
""" | |
Executa, para cada elemento da stream, uma função. | |
""" | |
self.content.subscribe(function, on_error = throw) | |
def map(self, function): | |
""" | |
Executa, para cada elemento A da stream, uma função f que gera um elemento B. | |
Retorna uma stream de elementos do tipo de B. | |
""" | |
return Stream(self.content.pipe(ops.map(function))) | |
def flat_map(self, function): | |
""" | |
Executa, para cada elemento A da stream, uma função f que gera uma sequência de elementos [B1, B2, B3, ...]. | |
Retorna uma stream de elementos do tipo de B. | |
""" | |
return Stream(self.content.pipe(ops.flat_map(function))) | |
def flatten(self): | |
""" | |
Assumindo que a stream seja de sequências de elementos [A1, A2, A3, ...], | |
retorna uma stream de elementos do tipo A. | |
""" | |
return Stream(self.content.pipe(ops.flat_map(identity))) | |
def filter(self, function): | |
""" | |
Executa, para cada elemento A da stream, uma função f (chamada de predicado) que retorna True ou False. | |
Retorna uma stream de elementos do tipo de A onde f(A) seja True. | |
""" | |
return Stream(self.content.pipe(ops.filter(function))) | |
def filter_not(self, function): | |
""" | |
Executa, para cada elemento A da stream, uma função f (chamada de predicado) que retorna True ou False. | |
Retorna uma stream de elementos do tipo de A onde f(A) seja False. | |
""" | |
return Stream(self.content.pipe(ops.filter(lambda x: not function(x)))) | |
def print(self): | |
""" | |
Realiza o print de cada elemento da stream | |
""" | |
self.run_for_each(print) | |
def concat(self, stream): | |
""" | |
Dada uma outra stream (ou iterável) X, retorna uma nova stream que, | |
ao terminar de enviar os dados da stream original, concatena com os elementos de X. | |
""" | |
if isinstance(stream, Iterable): | |
return Stream(self.content.pipe(ops.concat(rx.from_iterable(stream)))) | |
return Stream(self.content.pipe(ops.concat(stream.content))) | |
def count_window(self, size): | |
""" | |
Retorna uma stream de sequências [A1, A2, A3, ..., A_size] de tamanho fixo. | |
Uma sequência só será emitida quando forem consumidos 'size' elementos OU a stream acabe. | |
""" | |
return Stream(self.content.pipe( | |
ops.window_with_count(size), | |
ops.map(lambda s: s.pipe(ops.to_list())), | |
ops.merge_all() | |
) | |
) | |
def time_window(self, window_size, window_shift): | |
""" | |
Retorna uma stream de sequências [A1, A2, A3, ...] de tamanho variável. | |
Inicialmente, espera-se 'window_size' (informado usando o datetime.timedelta) de tempo coletando dados e envia uma sequência com todos os dados consumidos nesse tempo. | |
Em seguida, espera-se mais 'window_shift' (informado usando o datetime.timedelta) e envia os dados consumidos em 'window_size'. | |
Note que: | |
- se window_size > window_shift, haverá dados em comum entre duas sequências enviadas | |
- se window_size < window_shift, haverá dados que não serão enviados | |
""" | |
return Stream(self.content.pipe( | |
ops.window_with_time(window_size, window_shift), | |
ops.map(lambda s: s.pipe(ops.to_list())), | |
ops.merge_all() | |
) | |
) |
Hi @venkibale, I think there is some way, but I also think that you commented at the wrong gist xD
I never tried to do so, but I always wondered if it was possible. With my humble experience with Telegram API, it seems that it supports such interaction.
<!--
/* Font Definitions */
@font-face
{font-family:"Cambria Math";
panose-1:2 4 5 3 5 4 6 3 2 4;}
@font-face
{font-family:Calibri;
panose-1:2 15 5 2 2 2 4 3 2 4;}
/* Style Definitions */
p.MsoNormal, li.MsoNormal, div.MsoNormal
{margin:0cm;
font-size:11.0pt;
font-family:"Calibri",sans-serif;}
a:link, span.MsoHyperlink
{mso-style-priority:99;
color:blue;
text-decoration:underline;}
.MsoChpDefault
{mso-style-type:export-only;}
@page WordSection1
{size:612.0pt 792.0pt;
margin:72.0pt 72.0pt 72.0pt 72.0pt;}
div.WordSection1
{page:WordSection1;}
-->Hi, I know xD, I was trying to build something out of that but there was no luck on that. Anyway thanks for reaching out!! Thanks,Venkatesh Sent from Mail for Windows 10 From: Lucas PeresSent: 31 August 2020 15:37To: lucaspg96Cc: venkibale; MentionSubject: Re: lucaspg96/rx_stream_wrapper.py @lucaspg96 commented on this gist.Hi @venkibale, I think there is some way, but I also think that you commented at the wrong gist xDI never tried to do so, but I always wondered if it was possible. With my humble experience with Telegram API, it seems that it supports such interaction.—You are receiving this because you were mentioned.Reply to this email directly, view it on GitHub, or unsubscribe.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi,
Is there any way to send msgs to bot from bot ?
Thanks,
Venkatesh