This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Implements a string truncation service. Requires a Lexmark printer. | |
""" | |
import asyncio | |
import base64 | |
import click | |
handlers = {} | |
def register_handler(name): | |
def decorated(func): | |
handlers[name] = func | |
def wrapped(*args, **kwargs): | |
return func(*args, **kwargs) | |
return wrapped | |
return decorated | |
@register_handler('EHLO') | |
async def handle_ehlo(reader, writer, args): | |
message = [ | |
"250-truncate\n", | |
"250 AUTH LOGIN PLAIN\n", | |
] | |
message = [m.encode() for m in message] | |
writer.writelines(message) | |
await writer.drain() | |
return True | |
@register_handler('QUIT') | |
async def handle_quit(reader, writer, args): | |
return False | |
@register_handler('AUTH') | |
async def handle_auth(reader, writer, args): | |
if (not args) or (args[0] not in ('PLAIN', 'LOGIN')): | |
writer.write("500 Only LOGIN/PLAIN are supported".encode()) | |
await writer.drain() | |
method = args[0] | |
if method == 'PLAIN': | |
args.pop(0) | |
if args: | |
b64_creds = args[0] | |
else: | |
writer.write('334 '.encode()) | |
await writer.drain() | |
b64_creds = (await reader.readline()).decode() | |
creds = base64.b64decode(b64_creds).decode() | |
_, _username, password, = creds.split('\0') | |
elif method == 'LOGIN': | |
# Prompt for the username | |
# "Username:" -> VXNlcm5hbWU6 | |
writer.write('334 VXNlcm5hbWU6\n'.encode()) | |
await writer.drain() | |
_username = (await reader.readline()).decode() | |
# Prompt for the passworkd | |
# "Password:" -> UGFzc3dvcmQ6 | |
writer.write('334 UGFzc3dvcmQ6\n'.encode()) | |
await writer.drain() | |
b64_pass = (await reader.readline()).decode() | |
password = base64.b64decode(b64_pass).decode() | |
print(password) | |
writer.write("535 Authentication credentials invalid\n".encode()) | |
await writer.drain() | |
return True | |
async def handle_command(reader, writer): | |
data = await reader.readline() | |
message = data.decode() | |
if not message: | |
return False | |
if message.endswith('\n'): | |
message = message[:-1] | |
command, *args = message.split() | |
if command in handlers: | |
return await handlers[command](reader, writer, args) | |
else: | |
writer.write(f'500 Error: command "{command}" not recognized\n'.encode()) | |
await writer.drain() | |
return True | |
async def handle_connection(reader, writer): | |
writer.write('220 String truncation service\n'.encode()) | |
await writer.drain() | |
while await handle_command(reader, writer): | |
pass | |
writer.close() | |
async def server_func(host, port): | |
server = await asyncio.start_server(handle_connection, host, port) | |
async with server: | |
await server.serve_forever() | |
@click.command() | |
@click.option( | |
'--host', | |
help="The address to listen on", | |
default="127.0.0.1", | |
) | |
@click.option( | |
'--port', | |
help="The port to listen on", | |
type=click.INT, | |
default=8025, | |
) | |
def main(host, port): | |
asyncio.run(server_func(host, port)) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment