Skip to content

Instantly share code, notes, and snippets.

@sgsfak
Last active October 8, 2021 06:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sgsfak/e9b1636a2464316abfa1d8abcf872649 to your computer and use it in GitHub Desktop.
Save sgsfak/e9b1636a2464316abfa1d8abcf872649 to your computer and use it in GitHub Desktop.
a python server accepting MLLP-wrapped requests for PANACEA IMP
import socket
import threading
import socketserver
import sys
import time
def send_command(command:str)->str:
HOST = '127.0.0.1'
PORT = 27016
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((HOST, PORT))
s.sendall(command.encode('utf-8'))
data = []
while True:
msg = s.recv(1024)
if not msg:
break
print(f"Got '{msg}'")
data.append(msg.decode("utf-8", "strict"))
return "\n".join(data)
def debug(msg:str):
print('\033[32m' + msg+ '\033[0m', file=sys.stderr)
tries = [min(2, i) for i in range(1, 8)]
def acquire_credentials():
response = send_command('Launch capture')
if response == 'command received':
debug('Capture cmd submitted.. polling for result')
response = send_command('Matching result')
for k, t in enumerate(tries):
debug(f"Try {k+1}, got '{response}'")
if response != "0":
[username, password] = response.splitlines()[:2]
print(f"Username: {username} Password: '{password}'")
return (username, password)
debug(f"Sleeping for {t} seconds")
time.sleep(t)
response = send_command('Matching result')
else:
debug(f"Error, got '{response}'")
send_command("Stop capture")
return None
HOST = ''
PORT = 27017
header = b'\x0b'
trailer = b'\x1c\x0d'
class ThreadedTCPRequestHandler(socketserver.BaseRequestHandler):
def handle(self):
while True:
req = self.request.recv(1024)
if not req:
return
msg = req.removeprefix(header).removesuffix(trailer).decode()
print(f"Got {msg}")
creds = acquire_credentials()
if creds:
data = "\n".join(creds)
else:
data = ""
response = bytearray(header) + bytearray(data.encode("utf-8")) + bytearray(trailer)
self.request.sendall(response)
class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer):
pass
def client(port, message):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
sock.connect(('127.0.0.1', port))
sock.sendall(bytearray(header) + bytearray(message.encode("utf-8")) + bytearray(trailer))
response = sock.recv(1024).removeprefix(header).removesuffix(trailer).decode()
print("Received: {}".format(response))
if __name__ == "__main__":
server = ThreadedTCPServer((HOST, PORT), ThreadedTCPRequestHandler)
with server:
ip, port = server.server_address
print(f"Server started in {ip}:{port}")
server.serve_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment