Created April 25, 2020 22:49
#!/usr/bin/env python3
Implements a string truncation service. Requires a Lexmark printer.
import asyncio
import base64
import click
handlers = {}
def register_handler(name):
def decorated(func):
handlers[name] = func
def wrapped(*args, **kwargs):
return func(*args, **kwargs)
return wrapped
return decorated
async def handle_ehlo(reader, writer, args):
message = [
message = [m.encode() for m in message]
await writer.drain()
return True
async def handle_quit(reader, writer, args):
return False
async def handle_auth(reader, writer, args):
if (not args) or (args[0] not in ('PLAIN', 'LOGIN')):
writer.write("500 Only LOGIN/PLAIN are supported".encode())
await writer.drain()
method = args[0]
if method == 'PLAIN':
if args:
b64_creds = args[0]
writer.write('334 '.encode())
await writer.drain()
b64_creds = (await reader.readline()).decode()
creds = base64.b64decode(b64_creds).decode()
_, _username, password, = creds.split('\0')
elif method == 'LOGIN':
# Prompt for the username
# "Username:" -> VXNlcm5hbWU6
writer.write('334 VXNlcm5hbWU6\n'.encode())
await writer.drain()
_username = (await reader.readline()).decode()
# Prompt for the passworkd
# "Password:" -> UGFzc3dvcmQ6
writer.write('334 UGFzc3dvcmQ6\n'.encode())
await writer.drain()
b64_pass = (await reader.readline()).decode()
password = base64.b64decode(b64_pass).decode()
writer.write("535 Authentication credentials invalid\n".encode())
await writer.drain()
return True
async def handle_command(reader, writer):
data = await reader.readline()
message = data.decode()
if not message:
return False
if message.endswith('\n'):
message = message[:-1]
command, *args = message.split()
if command in handlers:
return await handlers[command](reader, writer, args)
writer.write(f'500 Error: command "{command}" not recognized\n'.encode())
await writer.drain()
return True
async def handle_connection(reader, writer):
writer.write('220 String truncation service\n'.encode())
await writer.drain()
while await handle_command(reader, writer):
async def server_func(host, port):
server = await asyncio.start_server(handle_connection, host, port)
async with server:
await server.serve_forever()
help="The address to listen on",
help="The port to listen on",
def main(host, port):, port))
if __name__ == '__main__':
