Skip to content

Instantly share code, notes, and snippets.

@IlyaSkriblovsky
Created February 13, 2019 17:52
Show Gist options
  • Save IlyaSkriblovsky/9a5692d051ff83ae5a7b0deeffa9623e to your computer and use it in GitHub Desktop.
Save IlyaSkriblovsky/9a5692d051ff83ae5a7b0deeffa9623e to your computer and use it in GitHub Desktop.
from attr import dataclass
from rx import AnonymousObservable, Observable
from rx.core import ObservableBase
from rx.subjects import Subject
from twisted.internet import reactor, defer
from twisted.internet.defer import Deferred
from twisted.internet.protocol import Protocol, connectionDone, Factory
from twisted.python.failure import Failure
def sleep(timeout) -> Deferred:
d = defer.Deferred()
reactor.callLater(timeout, d.callback, None)
return d
@defer.inlineCallbacks
def auth(login: str) -> Deferred:
yield sleep(2)
return f'user_{login}'
class MyProtocol(Protocol):
def __init__(self):
super().__init__()
self.data_in = Subject()
def connectionMade(self):
self.factory.connections.on_next(self)
def dataReceived(self, data):
self.data_in.on_next(data)
def connectionLost(self, reason=connectionDone):
self.data_in.on_completed()
def send_message(self, msg: str):
print('send_message', msg)
self.transport.write(msg.encode() + b'\r\n')
class MyFactory(Factory):
protocol = MyProtocol
def __init__(self):
super().__init__()
self.connections = Subject()
def stopFactory(self):
self.connections.on_completed()
def from_deferred(d: Deferred) -> ObservableBase:
def subscribe(observer, scheduler=None):
def on_both(result):
if isinstance(result, Failure):
observer.on_error(result.value)
else:
observer.on_next(result)
observer.on_completed()
return result
def dispose():
d.cancel()
d.addBoth(on_both)
return dispose
return AnonymousObservable(subscribe)
f = MyFactory()
@dataclass
class AuthenticatedMessage:
login: str
message: str
def authenticated_messages(conn):
lines = conn.data_in\
.map(lambda chunk: chunk.decode().strip())
auth_done = lines \
.first() \
.flat_map(lambda msg: from_deferred(auth(msg))) \
.publish() \
.auto_connect()
auth_done.subscribe(lambda x: print('AUTH', x), on_completed=lambda: print('AUTH DONE'))
return Observable.merge(
lines.window(auth_done)
.concat_map(Observable.to_iterable)
.zip(auth_done, lambda msgs, login: [AuthenticatedMessage(login, message) for message in msgs])
.concat_map(Observable.from_iterable),
lines.with_latest_from(auth_done, lambda message, login: AuthenticatedMessage(login, message))
)
messages = f.connections.flat_map(lambda conn: authenticated_messages(conn).map(lambda msg: (conn, msg)))
Observable.merge(
messages.filter(lambda connmsg: connmsg[1].message.startswith('r '))
.map(lambda connmsg: (connmsg[0], ''.join(reversed(connmsg[1].message[2:])))),
messages.filter(lambda connmsg: connmsg[1].message.startswith('d '))
.map(lambda connmsg: (connmsg[0], ''.join(c * 2 for c in connmsg[1].message[2:])))
).subscribe(lambda connmsg: connmsg[0].send_message(connmsg[1]))
reactor.listenTCP(1234, f)
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment