Skip to content

Instantly share code, notes, and snippets.

@TimelessP
Created July 31, 2022 14:21
Show Gist options
  • Save TimelessP/5abce3c64c4b671d79121d1e459aecb3 to your computer and use it in GitHub Desktop.
Save TimelessP/5abce3c64c4b671d79121d1e459aecb3 to your computer and use it in GitHub Desktop.
Twistd echo and shout example of a telnet service with a background task.
"""
telnet_echo.py by @TimelessP
https://gist.github.com/TimelessP/5abce3c64c4b671d79121d1e459aecb3
Run using:
`twistd -ny telnet_echo.py`
"""
from twisted.application.internet import TCPServer
from twisted.application.service import Application
from twisted.conch.telnet import TelnetProtocol, TelnetTransport
from twisted.internet.protocol import ServerFactory
from twisted.internet import task
from twisted.python.failure import Failure
from twisted.logger import Logger
class TelnetEcho(TelnetProtocol):
log = Logger()
# When the connection disconnects, remove it from the list of clients.
def __init__(self):
self.client = None
def connectionLost(self, reason: Failure):
global state
for client in state["clients"]:
if client["client"] == self:
state["clients"].remove(client)
break
self.log.info(f"Connection lost: {self.client['client'].transport.getPeer()}, {reason}")
def connectionMade(self):
global state
self.log.info(f"Connection made: {self.transport.getPeer()}")
self.client = {"client": self, "stdin": [], "stdout": []}
state["clients"].append(self.client)
def dataReceived(self, data):
global state
self.client["stdin"].append(data)
message = f"{state['count']}: {repr(data.decode('utf-8'))}, clients: {len(state['clients'])}\r\n"
message_bytes = message.encode('utf-8')
self.client["stdout"].append(message_bytes)
self.client["client"].transport.write(b"\r\n".join(self.client["stdout"]))
self.client["stdout"] = []
log = Logger()
factory = ServerFactory()
factory.protocol = lambda: TelnetTransport(TelnetEcho)
service = TCPServer(6023, factory)
application = Application("Telnet Echo Server")
service.setServiceParent(application)
state = {
"count": 0,
"clients": []
}
def run_every_second():
state["count"] += 1
for client in state["clients"]:
for inp in client["stdin"]:
if inp.strip() == b"exit":
client["client"].transport.loseConnection()
elif inp.strip().startswith(b"shout "):
log.info(f"Shouting: src={client['client'].transport.getPeer()}: {inp.strip()[6:]}")
for each_client in [c for c in state["clients"] if c != client]:
# Buffered send that awaits that client's next input before sending.
# each_client["stdout"].append(inp[6:])
# Unbuffered send.
each_client["client"].transport.write(inp[6:])
client["stdin"] = []
# Set up a looping call task that runs once every second.
looping = task.LoopingCall(run_every_second)
looping.start(1.0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment