Skip to content

Instantly share code, notes, and snippets.

@c64emulator
Created May 4, 2017 18:45
Show Gist options
  • Save c64emulator/b213312772466f1c1ab11d78647e32df to your computer and use it in GitHub Desktop.
Save c64emulator/b213312772466f1c1ab11d78647e32df to your computer and use it in GitHub Desktop.
Run server displaying Star Wars Asciimation via telnet
# asciimation.py - run server displaying star wars asciimation via telnet
import os
import re
import sys
import time
import gzip
import socket
import signal
import logging
import urllib2
import asyncore
HOST = '127.0.0.1'
PORT = 'telnet'
logging.basicConfig(format='%(asctime)s %(message)s', datefmt='%b %d %H:%M:%S',
level=logging.INFO)
def get_page(url='http://www.asciimation.co.nz', cache_filename='asciimation.gz'):
filepath = os.path.join(os.path.dirname(__file__), cache_filename)
if not os.path.exists(filepath):
print(url)
result = urllib2.urlopen(url).read()
with gzip.open(filepath, 'wb') as fd:
fd.write(result)
with gzip.open(filepath) as fd:
result = fd.read()
return result
def iterframes(page):
film = re.search(r"var film = '(.*)'\.split\('\\n'\);", page).group(1)
film = film.decode('string_escape')
duration, lines = 1, range(2, 15)
for ma in re.finditer(r'(\d+)\n' + 13 * r'(.*)\n', film):
yield int(ma.group(duration)), centerframe(ma.group(*lines))
@apply
def centerframe(screen_size=(80, 24), frame_size=(67, 13)):
hmargin, vmargin = (s - f for s, f in zip(screen_size, frame_size))
screen = '\r\n' * (vmargin / 2) + '%s' + '\r\n' * (vmargin - vmargin / 2)
content = '%%-%ds' % frame_size[0]
row = ' ' * (hmargin / 2) + content + ' ' * (hmargin - hmargin / 2)
screen = '\x1b[H' + '\x1b[J' + screen # home cls
def centerframe_func(lines):
return screen % '\r\n'.join(row % l for l in lines)
return centerframe_func
class Server(asyncore.dispatcher):
def __init__(self, host, port):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
if not isinstance(port, int):
port = socket.getservbyname(port)
logging.info('server listening on %s port %s' % (host, port))
self.bind((host, port))
self.listen(5)
def handle_accept(self):
conn, address = self.accept()
Handler(conn)
def shutdown(self):
logging.info('server shutdown')
asyncore.close_all()
self.close()
class Handler(asyncore.dispatcher):
_frames = list(iterframes(get_page()))
def __init__(self, conn):
asyncore.dispatcher.__init__(self, conn)
logging.info('client connect %s port %s' % self.addr)
self.frames = iter(self._frames)
self.duration = 0
def handle_read(self, bufsize=1024):
self.recv(bufsize)
def handle_write(self):
if not self.duration:
try:
self.duration, frame = next(self.frames)
except StopIteration:
logging.info('close client %s port %s' % self.remote)
self.close()
return
self.send(frame)
self.duration -= 1
def handle_close(self):
logging.info('client disconnect %s port %s' % self.addr)
self.close()
def chroot(user='nobody', directory='/tmp', fix_time=True):
import pwd
if fix_time:
os.environ['TZ'] = open('/etc/timezone').readline().strip()
time.tzset()
uid, gid = pwd.getpwnam(user)[2:4]
os.chroot(directory)
os.setgid(gid)
os.setgroups([])
os.setuid(uid)
def serve(fps=15):
interval = 1.0 / fps
while True:
asyncore.loop(interval, count=1)
time.sleep(interval)
def main(host=HOST, port=PORT):
s = Server(host, port)
try:
chroot()
except ImportError:
pass
signal.signal(signal.SIGTERM, lambda signum, frame: sys.exit())
try:
serve()
except socket.error as e:
sys.exit(e)
except (KeyboardInterrupt, SystemExit):
logging.info('exiting')
finally:
s.shutdown()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment