Created
February 13, 2019 17:52
-
-
Save IlyaSkriblovsky/9a5692d051ff83ae5a7b0deeffa9623e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from attr import dataclass | |
from rx import AnonymousObservable, Observable | |
from rx.core import ObservableBase | |
from rx.subjects import Subject | |
from twisted.internet import reactor, defer | |
from twisted.internet.defer import Deferred | |
from twisted.internet.protocol import Protocol, connectionDone, Factory | |
from twisted.python.failure import Failure | |
def sleep(timeout) -> Deferred: | |
d = defer.Deferred() | |
reactor.callLater(timeout, d.callback, None) | |
return d | |
@defer.inlineCallbacks | |
def auth(login: str) -> Deferred: | |
yield sleep(2) | |
return f'user_{login}' | |
class MyProtocol(Protocol): | |
def __init__(self): | |
super().__init__() | |
self.data_in = Subject() | |
def connectionMade(self): | |
self.factory.connections.on_next(self) | |
def dataReceived(self, data): | |
self.data_in.on_next(data) | |
def connectionLost(self, reason=connectionDone): | |
self.data_in.on_completed() | |
def send_message(self, msg: str): | |
print('send_message', msg) | |
self.transport.write(msg.encode() + b'\r\n') | |
class MyFactory(Factory): | |
protocol = MyProtocol | |
def __init__(self): | |
super().__init__() | |
self.connections = Subject() | |
def stopFactory(self): | |
self.connections.on_completed() | |
def from_deferred(d: Deferred) -> ObservableBase: | |
def subscribe(observer, scheduler=None): | |
def on_both(result): | |
if isinstance(result, Failure): | |
observer.on_error(result.value) | |
else: | |
observer.on_next(result) | |
observer.on_completed() | |
return result | |
def dispose(): | |
d.cancel() | |
d.addBoth(on_both) | |
return dispose | |
return AnonymousObservable(subscribe) | |
f = MyFactory() | |
@dataclass | |
class AuthenticatedMessage: | |
login: str | |
message: str | |
def authenticated_messages(conn): | |
lines = conn.data_in\ | |
.map(lambda chunk: chunk.decode().strip()) | |
auth_done = lines \ | |
.first() \ | |
.flat_map(lambda msg: from_deferred(auth(msg))) \ | |
.publish() \ | |
.auto_connect() | |
auth_done.subscribe(lambda x: print('AUTH', x), on_completed=lambda: print('AUTH DONE')) | |
return Observable.merge( | |
lines.window(auth_done) | |
.concat_map(Observable.to_iterable) | |
.zip(auth_done, lambda msgs, login: [AuthenticatedMessage(login, message) for message in msgs]) | |
.concat_map(Observable.from_iterable), | |
lines.with_latest_from(auth_done, lambda message, login: AuthenticatedMessage(login, message)) | |
) | |
messages = f.connections.flat_map(lambda conn: authenticated_messages(conn).map(lambda msg: (conn, msg))) | |
Observable.merge( | |
messages.filter(lambda connmsg: connmsg[1].message.startswith('r ')) | |
.map(lambda connmsg: (connmsg[0], ''.join(reversed(connmsg[1].message[2:])))), | |
messages.filter(lambda connmsg: connmsg[1].message.startswith('d ')) | |
.map(lambda connmsg: (connmsg[0], ''.join(c * 2 for c in connmsg[1].message[2:]))) | |
).subscribe(lambda connmsg: connmsg[0].send_message(connmsg[1])) | |
reactor.listenTCP(1234, f) | |
reactor.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment