Skip to content

Instantly share code, notes, and snippets.

@kylelaker
Created April 25, 2020 22:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kylelaker/336c05e3b0f2f89f915a73e6803aebe6 to your computer and use it in GitHub Desktop.
Save kylelaker/336c05e3b0f2f89f915a73e6803aebe6 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Implements a string truncation service. Requires a Lexmark printer.
"""
import asyncio
import base64
import click
handlers = {}
def register_handler(name):
def decorated(func):
handlers[name] = func
def wrapped(*args, **kwargs):
return func(*args, **kwargs)
return wrapped
return decorated
@register_handler('EHLO')
async def handle_ehlo(reader, writer, args):
message = [
"250-truncate\n",
"250 AUTH LOGIN PLAIN\n",
]
message = [m.encode() for m in message]
writer.writelines(message)
await writer.drain()
return True
@register_handler('QUIT')
async def handle_quit(reader, writer, args):
return False
@register_handler('AUTH')
async def handle_auth(reader, writer, args):
if (not args) or (args[0] not in ('PLAIN', 'LOGIN')):
writer.write("500 Only LOGIN/PLAIN are supported".encode())
await writer.drain()
method = args[0]
if method == 'PLAIN':
args.pop(0)
if args:
b64_creds = args[0]
else:
writer.write('334 '.encode())
await writer.drain()
b64_creds = (await reader.readline()).decode()
creds = base64.b64decode(b64_creds).decode()
_, _username, password, = creds.split('\0')
elif method == 'LOGIN':
# Prompt for the username
# "Username:" -> VXNlcm5hbWU6
writer.write('334 VXNlcm5hbWU6\n'.encode())
await writer.drain()
_username = (await reader.readline()).decode()
# Prompt for the passworkd
# "Password:" -> UGFzc3dvcmQ6
writer.write('334 UGFzc3dvcmQ6\n'.encode())
await writer.drain()
b64_pass = (await reader.readline()).decode()
password = base64.b64decode(b64_pass).decode()
print(password)
writer.write("535 Authentication credentials invalid\n".encode())
await writer.drain()
return True
async def handle_command(reader, writer):
data = await reader.readline()
message = data.decode()
if not message:
return False
if message.endswith('\n'):
message = message[:-1]
command, *args = message.split()
if command in handlers:
return await handlers[command](reader, writer, args)
else:
writer.write(f'500 Error: command "{command}" not recognized\n'.encode())
await writer.drain()
return True
async def handle_connection(reader, writer):
writer.write('220 String truncation service\n'.encode())
await writer.drain()
while await handle_command(reader, writer):
pass
writer.close()
async def server_func(host, port):
server = await asyncio.start_server(handle_connection, host, port)
async with server:
await server.serve_forever()
@click.command()
@click.option(
'--host',
help="The address to listen on",
default="127.0.0.1",
)
@click.option(
'--port',
help="The port to listen on",
type=click.INT,
default=8025,
)
def main(host, port):
asyncio.run(server_func(host, port))
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment