Skip to content

Instantly share code, notes, and snippets.

@silverweed
Created May 12, 2015 10:47
Show Gist options
  • Save silverweed/ef4c075e8aa3f93aa461 to your computer and use it in GitHub Desktop.
Save silverweed/ef4c075e8aa3f93aa461 to your computer and use it in GitHub Desktop.
Minimalistic pokepon client
#!/usr/bin/python3
# simple Pokepon client
# by silverweed
import telnetlib
import getpass
import hashlib
import sys
import signal
from threading import Thread
import re
err = sys.stderr.write
termnone = "\033[0m"
termcolors = {
'red': "\033[1;31m",
'blue': "\033[1;34m",
'purple': "\033[1;35m",
'green': "\033[1;32m",
'yellow': "\033[1;33m",
'cyan': "\033[1;36m",
'white': "\033[1;37m",
'gray': "\033[1;30m"
}
termbold = '\033[1m'
termdim = '\033[2m'
termemph = '\033[4m'
def log(msg):
sys.stderr.write('[PyClient::log] ' + str(msg) + "\n")
def stripHTML(msg):
buf = []
in_tag = False
for c in msg:
if c == '>':
if in_tag:
in_tag = False
else:
buf.append(c)
elif c == '<':
if not in_tag:
in_tag = True
else:
if not in_tag:
buf.append(c)
return ''.join(buf)
def convertHTML(msg):
re_color = re.compile(r'<font color="(?P<color>[a-z]+)">(?P<content>.*?)</font>')
re_bold = re.compile(r'<b>(.*?)</b>')
re_italic = re.compile(r'<(?:i|em)>(.*?)</(?:i|em)>')
re_br = re.compile('<br/?>')
re_a = re.compile("""<a .*href=["'](?P<url>.*)["']>(?P<name>.*?)</a>""")
regexes = { re_color: 'color', re_bold: 'bold', re_italic: 'italic', re_br: 'br', re_a: 'a' }
def process(retype, match):
if retype == 'color':
return termcolors[match.group('color')] + match.group('content') + termnone
elif retype == 'bold':
return termbold + match.group(1) + termnone
elif retype == 'italic':
return termemph + match.group(1) + termnone
elif retype == 'br':
return '\n'
elif retype == 'a':
return termdim + '[' + match.group('name') + '](' + termnone + \
termemph + match.group('url') + termdim + ')' + termnone
matched = True
while matched:
matched = False
for regex in regexes:
match = regex.search(msg)
if match != None:
matched = True
msg = re.sub(regex, process(regexes[regex], match), msg, count=1)
return msg
class Listener(Thread):
def __init__(self, client):
Thread.__init__(self)
self.client = client
def run(self):
while True:
msg = self.client.connection.read_until(b"\n").decode('utf-8')
for ex in reversed(self.client.executors):
ret = ex.execute(msg)
if ret == 0:
pass
elif ret == 1:
break
elif ret == 2:
self.client.connection.close()
return
class CommunicationExecutor:
def __init__(self, client):
self.client = client
def execute(self, msg):
if msg[0] != Client.CMN_PREFIX:
return 0
cmd, *args = msg[1:].split()
if verbose:
log("cmd: "+cmd+", args: "+''.join(args))
if cmd == 'ping':
self.client.cmn('pong')
elif cmd == 'youros':
self.client.cmn('myos PythonClient 0.1')
elif cmd == 'ok':
log('Handshake completed.')
elif cmd == 'drop':
log(*args)
log('Server dropped connection')
return 2
elif cmd == 'motd':
print('<<<MOTD\n' + convertHTML(' '.join(args)) + '\n<<<END OF MOTD')
elif cmd == 'html' or cmd == 'htmlconv':
print(convertHTML(' '.join(args)))
elif cmd == 'givepasswd':
pwd = getpass.getpass("Enter password > ")
self.client.cmn('passwd ' + self.client.hashpwd(pwd))
elif cmd == 'disconnect':
self.client.disconnect()
return 2
return 1
class DefaultExecutor:
def __init__(self, client):
self.client = client
def execute(self, msg):
sys.stdout.write(convertHTML(msg))
return 1
class Client:
CMN_PREFIX = '!'
CMD_PREFIX = '/'
def __init__(self, ip, port):
self.ip = ip
self.port = port
def connect(self):
try:
self.executors = [DefaultExecutor(self)]
self.executors.append(CommunicationExecutor(self))
self.listener = Listener(self)
self.connection = telnetlib.Telnet(self.ip, self.port)
self.listener.start()
log('Started listener')
self.run()
except Exception as e:
err(str(e))
def disconnect(self):
self.cmd('disconnect')
try:
self.connection.close()
except Exception:
pass
log('Disconnected.')
def run(self):
for line in sys.stdin:
self.process(line)
def process(self, line):
self.send_raw(line)
def send_raw(self, line):
self.connection.write((line + "\n").encode('utf-8'))
def cmd(self, msg):
self.connection.write((Client.CMD_PREFIX + msg + "\n").encode('utf-8'))
def cmn(self, msg):
self.connection.write((Client.CMN_PREFIX + msg + "\n").encode('utf-8'))
def hashpwd(self, pwd):
algo = hashlib.sha1()
algo.update(pwd.encode('utf-8'))
hashed = algo.hexdigest()
log("hashed: "+str(hashed))
return str(map(
lambda c: 'x' if c == '\n' or c == '\f' or c == '\r' else c,
str(hashed)))
verbose = False
if __name__ == '__main__':
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument('-p', '--port', help='port to connect to', default=12344)
parser.add_argument('-v', '--verbose', dest='verbose', action='store_true', default=False)
parser.add_argument('ip', help='The IP to connect to', default='pokepon.center')
opts = parser.parse_args()
verbose = opts.verbose
log('Creating client')
client = Client(opts.ip, opts.port)
def sigint_handler(client):
def handler(signum, frame):
if client != None:
client.disconnect()
sys.exit(0)
return handler
signal.signal(signal.SIGINT, sigint_handler(client))
client.connect()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment