Created
May 25, 2018 03:50
-
-
Save haliphax/1aec62815fa76684d3351897ed8580d2 to your computer and use it in GitHub Desktop.
nodechat.py band-aid version cannibalized from ircchat.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Node chat script for x/84. Bastardized from ircchat.py. """ | |
# stdlib | |
from collections import deque | |
import datetime | |
import re | |
import warnings | |
# 3rd party | |
from x84.bbs import (ScrollingEditor, getsession, echo, getterminal, | |
get_ini, syncterm_setfont, LineEditor) | |
#: how many lines of scrollback to save for redrawing the interface? | |
MAX_SCROLLBACK = get_ini('chat', 'max_scrollback', getter='getint') or 200 | |
#: how many characters of input to allow? | |
MAX_INPUT = get_ini('chat', 'max_input', getter='getint') or 200 | |
#: color of line editor | |
COLOR_INPUTBAR = get_ini('chat', 'color_inputbar') or 'bold_white_on_magenta' | |
#: syncterm font to use | |
SYNCTERM_FONT = get_ini('chat', 'syncterm_font') or 'topaz' | |
#: ansi color list for mapping to mirc colors | |
ANSI_COLORS = ('black', 'blue', 'green', 'cyan', 'red', 'magenta', 'yellow', | |
'white') | |
editor = None | |
scrollback = None | |
session = None | |
term = None | |
def format_server(nick, message): | |
return u'{0} {1} {2}'.format( | |
indicator(character=u'*', color=term.white), term.bold_white(nick), | |
message) | |
def format_action(nick, action, color=None): | |
""" Helper for formatting action messages """ | |
return u'{0} {1} {2}'.format( | |
indicator(character=u'+', color=term.bold_black), | |
term.bold_white(nick), action) | |
def format_message(nick, message): | |
""" Helper for formatting messages """ | |
return '{0}{1}{2} {3}'.format( | |
term.magenta('<'), term.bold_white(nick), term.magenta('>'), message) | |
def queue(message, encode=True): | |
""" Queue up a message (with timestamp) to be displayed """ | |
now = datetime.datetime.now() | |
data = u'{0} {1}'.format(term.bold_black(u'{0}:{1}' | |
.format('%02d' % now.hour, '%02d' % now.minute)), | |
ansi_encode(message) if encode else message) | |
chat_event(data) | |
def indicator(color=None, character=None): | |
""" Construct an indicator to be used in output """ | |
# pylint: disable=R0201 | |
color = color or term.white | |
character = character or u'*' | |
return (u'{0}{1}{0}'.format(term.bold_black(u'-'), color(character))) | |
def helptext(): | |
""" Show /HELP text """ | |
# pylint: disable=W0141 | |
commands = map(term.bold, | |
[u'/HELP', u'/COLORS', u'/ME',]) | |
queue(u'{0} Use {1} to quit. Other commands: {2}' | |
.format(indicator(), term.bold(u'/QUIT'), | |
term.white(u', ').join(commands),)) | |
def colors(): | |
""" Show color codes """ | |
queue(u''.join((u'{0} Color codes are available through use of the pipe', | |
u'character.')) | |
.format(indicator())) | |
fgc = u'{0} Foreground: ' | |
for x in range(0, 8): | |
val = u'0' + unicode(x) | |
if x == 0: | |
fgc += term.on_white | |
fgc += u''.join((getattr(term, ANSI_COLORS[x]), u'|', val, | |
term.normal, u' ')) | |
for x in range(0, 8): | |
val = unicode(x + 8) | |
if len(val) == 1: | |
val = u'0' + val | |
fgc += u''.join((getattr(term, 'bold_' + ANSI_COLORS[x]), | |
u'|', val, term.normal, u' ')) | |
queue(fgc.format(indicator()), encode=False) | |
bgc = u'{0} Background: ' | |
for x in range(0, 8): | |
val = u'b' + unicode(x) | |
if x > 0: | |
bgc += term.bright_white | |
bgc += u''.join((getattr(term, 'on_' + ANSI_COLORS[x]), u'|', | |
val, term.normal, u' ')) | |
queue(bgc.format(indicator()), encode=False) | |
queue(u'{0} Other: {1} {2} {3} |nn normal' | |
.format(indicator(), | |
term.bold(u'|bb bold'), | |
term.underline(u'|uu underline'), | |
term.reverse(u'|rr reverse')), | |
encode=False) | |
def ansi_encode(text): | |
""" | |
Convert pipe codes into ANSI codes | |
:param blessed.Terminal term: The current pty | |
:param str text: The text to encode | |
:rtype: str | |
:returns: Encoded text | |
""" | |
pipes = re.compile(r'\|(0[0-9]|1[0-5]|b[0-7]|bb|uu|rr|nn)', | |
flags=re.IGNORECASE) | |
output = u'' | |
ptr = 0 | |
match = None | |
for match in pipes.finditer(text): | |
val = unicode(match.group(1).lower()) | |
attr = u'' | |
if not val.isnumeric(): | |
if val == 'bb': | |
attr = term.bold | |
elif val == 'uu': | |
attr = term.underline | |
elif val == 'rr': | |
attr = term.reverse | |
elif val == 'nn': | |
attr = term.normal | |
elif val[0] == 'b': | |
int_value = int(val[1], 10) | |
attr = getattr(term, 'on_' + ANSI_COLORS[int_value]) | |
else: | |
if val.startswith('0'): | |
val = val[1:] | |
int_value = int(val, 10) | |
if int_value < 8: | |
attr = getattr(term, ANSI_COLORS[int_value]) | |
elif int_value >= 8: | |
attr = getattr(term, 'bold_' + ANSI_COLORS[int_value - 8]) | |
output += text[ptr:match.start()] + attr | |
ptr = match.end() | |
output = text if match is None else u''.join((output, text[match.end():])) | |
return u''.join((output, term.normal)) | |
def refresh_event(): | |
""" Screen resized, adjust layout """ | |
editor.width = term.width + 1 | |
editor.xloc = -1 | |
editor.yloc = term.height - 1 | |
# re-wrap chat log | |
numlines = term.height - 2 | |
sofar = 0 | |
output = list() | |
for line in reversed(scrollback): | |
wrapped = term.wrap(line, term.width - 1) | |
sofar += len(wrapped) | |
output = wrapped + output | |
if sofar >= numlines: | |
break | |
output = output[numlines * -1:] | |
echo(u''.join([ | |
term.normal, term.home, term.clear_eos, | |
term.move(editor.yloc + 1, 0), | |
u'\r\n'.join(output), | |
u'\r\n', editor.refresh()])) | |
def chat_event(data): | |
""" Chat output has been received """ | |
# add it to our scrollback and trim | |
scrollback.append(data) | |
# blank out the line with the input bar, wrap output, redraw input bar | |
echo(u''.join([ | |
term.normal, | |
term.move(editor.yloc + 1, 0), | |
term.clear_eol, | |
term.move_x(0), | |
u'\r\n'.join(term.wrap(data, term.width - 1, | |
break_long_words=True)), | |
term.move(editor.yloc + 1, 0), | |
u'\r\n', | |
term.move(editor.yloc, 0), | |
editor.refresh(), | |
])) | |
def input_event(): | |
""" | |
React to input event, processing /commands. | |
Returns True, unless the caller should exit, then False. | |
""" | |
retval = True | |
inp = term.inkey(0) | |
while inp: | |
echo(editor.process_keystroke(inp)) | |
if not editor.carriage_returned: | |
retval = True | |
else: | |
# process return key | |
line = editor.content.rstrip() | |
# clear editor | |
editor.update(u'') | |
echo(editor.refresh()) | |
lowered = line.lower() | |
# parse input for potential commands | |
if lowered == u'/quit': | |
retval = False | |
elif lowered.startswith(u'/me ') and len(lowered) > 4: | |
queue(format_action(session.user.handle, line[4:])) | |
session.send_event('global', ('chat', 'act', | |
session.user.handle, line[4:])) | |
elif lowered == u'/help': | |
helptext() | |
elif lowered == u'/colors': | |
colors() | |
elif line.startswith(u'/'): | |
# some other .. unsupported command | |
retval = True | |
elif len(line.lstrip()): | |
# no command was received; post pubmsg, instead | |
queue(format_message(session.user.handle, line)) | |
session.send_event('global', ('chat', 'msg', | |
session.user.handle, line)) | |
inp = term.inkey(0) | |
return retval | |
def main(): | |
""" x/84 script launch point """ | |
global term, session, scrollback, editor | |
term, session = getterminal(), getsession() | |
scrollback = deque(maxlen=1000) | |
session.activity = u'Chatting locally' | |
# move to bottom of screen, reset attribute | |
echo(term.pos(term.height) + term.normal) | |
# create a new, empty screen | |
echo(u'\r\n' * (term.height + 1)) | |
echo(term.home + term.clear) | |
# set font | |
if SYNCTERM_FONT and term.kind.startswith('ansi'): | |
echo(syncterm_setfont(SYNCTERM_FONT)) | |
# ignore "not in view" warning for AnsiWindow | |
with warnings.catch_warnings(): | |
warnings.simplefilter('ignore') | |
editor = ScrollingEditor( | |
width=term.width + 1, | |
xloc=-1, | |
yloc=term.height, | |
colors={'highlight': getattr(term, COLOR_INPUTBAR)}, | |
max_length=MAX_INPUT | |
) | |
session.send_event('global', ('chat', 'srv', session.user.handle, | |
'connected')) | |
session.flush_event('global') | |
refresh_event() | |
helptext() | |
try: | |
while True: | |
event, data = session.read_events(('global', 'input', 'refresh')) | |
if event == 'refresh': | |
refresh_event() | |
elif event == 'global' and len(data) and data[0] == 'chat': | |
out = None | |
if data[1] == 'msg': | |
out = format_message(data[2], data[3]) | |
elif data[1] == 'act': | |
out = format_action(data[2], data[3]) | |
elif data[1] == 'srv': | |
out = format_server(data[2], data[3]) | |
if not out is None: | |
queue(out) | |
elif event == 'input': | |
session.buffer_input(data, pushback=True) | |
if not input_event(): | |
break | |
echo(term.normal) | |
echo(term.clear) | |
finally: | |
session.send_event('global', ('chat', 'srv', session.user.handle, | |
'disconnected')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment