Created
February 11, 2013 07:11
-
-
Save sente/4753069 to your computer and use it in GitHub Desktop.
This crux of this module is the Terminal class which is a pure-Python
implementation of the quintessential Unix terminal emulator. It does its best
to emulate an xterm and along with that comes support for the majority of the
relevant portions of ECMA-48. This includes support for emulating varous VT-*
terminal types as well as the "linux" termi…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# | |
# Copyright 2011 Liftoff Software Corporation | |
# | |
# Meta | |
__version__ = '1.1' | |
__version_info__ = (1, 1) | |
__license__ = "AGPLv3 or Proprietary (see LICENSE.txt)" | |
__author__ = 'Dan McDougall <daniel.mcdougall@liftoffsoftware.com>' | |
__doc__ = """\ | |
About This Module | |
================= | |
This crux of this module is the Terminal class which is a pure-Python | |
implementation of the quintessential Unix terminal emulator. It does its best | |
to emulate an xterm and along with that comes support for the majority of the | |
relevant portions of ECMA-48. This includes support for emulating varous VT-* | |
terminal types as well as the "linux" terminal type. | |
The Terminal class's VT-* emulation support is not complete but it should | |
suffice for most terminal emulation needs (e.g. all your typical command line | |
programs should work wonderfully). If something doesn't look quite right or you | |
need support for certain modes added please feel free to open a ticket on Gate | |
One's issue tracker: https://github.com/liftoff/GateOne/issues | |
Note that Terminal was written from scratch in order to be as fast as possible. | |
It is extensively commented and implements some interesting patterns in order to | |
maximize execution speed (most notably for things that loop). Some bits of code | |
may seem "un-Pythonic" and/or difficult to grok but understand that this is | |
probably due to optimizations. If you know "a better way" please feel free to | |
submit a patch, open a ticket, or send us an email. There's a reason why open | |
source software is a superior development model! | |
Supported Emulation Types | |
------------------------- | |
Without any special mode settings or parameters Terminal should effectively | |
emulate the following terminal types: | |
* xterm (the most important one) | |
* ECMA-48/ANSI X3.64 | |
* Nearly all the VT-* types: VT-52, VT-100, VT-220, VT-320, VT-420, and VT-520 | |
* Linux console ("linux") | |
If you want Terminal to support something else or it's missing a feature from | |
any given terminal type please `let us know <https://github.com/liftoff/GateOne/issues/new>`_. | |
We'll implement it! | |
What Terminal Doesn't Do | |
------------------------ | |
The Terminal class is meant to emulate the display portion of a given terminal. | |
It does not translate keystrokes into escape sequences or special control | |
codes--you'll have to take care of that in your application (or at the | |
client-side like Gate One). It does, however, keep track of many | |
keystroke-specific modes of operation such as Application Cursor Keys and the G0 | |
and G1 charset modes *with* callbacks that can be used to notify your | |
application when such things change. | |
Special Considerations | |
---------------------- | |
Many methods inside Terminal start with an underscore. This was done to | |
indicate that such methods shouldn't be called directly (from a program that | |
imported the module). If it was thought that a situation might arise where a | |
method could be used externally by a controlling program, the underscore was | |
omitted. | |
Asynchronous Use | |
---------------- | |
To support asynchronous usage (and make everything faster), Terminal was written | |
to support extensive callbacks that are called when certain events are | |
encountered. Here are the events and their callbacks: | |
.. _callback_constants: | |
==================================== ================================================================================ | |
Callback Constant (ID) Called when... | |
==================================== ================================================================================ | |
:attr:`terminal.CALLBACK_SCROLL_UP` The terminal is scrolled up (back). | |
:attr:`terminal.CALLBACK_CHANGED` The screen is changed/updated. | |
:attr:`terminal.CALLBACK_CURSOR_POS` The cursor position changes. | |
:attr:`terminal.CALLBACK_DSR` A Device Status Report (DSR) is requested (via the DSR escape sequence). | |
:attr:`terminal.CALLBACK_TITLE` The terminal title changes (xterm-style) | |
:attr:`terminal.CALLBACK_BELL` The bell character (^G) is encountered. | |
:attr:`terminal.CALLBACK_OPT` The special optional escape sequence is encountered. | |
:attr:`terminal.CALLBACK_MODE` The terminal mode setting changes (e.g. use alternate screen buffer). | |
:attr:`terminal.CALLBACK_MESSAGE` The terminal needs to send the user a message (without messing with the screen). | |
==================================== ================================================================================ | |
Note that CALLBACK_DSR is special in that it in most cases it will be called with arguments. See the code for examples of how and when this happens. | |
Also, in most cases it is unwise to override CALLBACK_MODE since this method is primarily meant for internal use within the Terminal class. | |
Using Terminal | |
-------------- | |
Gate One makes extensive use of the Terminal class and its callbacks. So that's | |
a great place to look for specific examples (gateone.py and termio.py, | |
specifically). Having said that, implementing Terminal is pretty | |
straightforward:: | |
>>> import terminal | |
>>> term = terminal.Terminal(24, 80) | |
>>> term.write("This text will be written to the terminal screen.") | |
>>> term.dump() | |
[u'This text will be written to the terminal screen. ', | |
<snip> | |
u' '] | |
Here's an example with some basic callbacks: | |
>>> def mycallback(): | |
... "This will be called whenever the screen changes." | |
... print("Screen update! Perfect time to dump the terminal screen.") | |
... print(term.dump()[0]) # Only need to see the top line for this demo =) | |
... print("Just dumped the screen.") | |
>>> import terminal | |
>>> term = terminal.Terminal(24, 80) | |
>>> term.callbacks[term.CALLBACK_CHANGED] = mycallback | |
>>> term.write("This should result in mycallback() being called") | |
Screen update! Perfect time to dump the terminal screen. | |
This should result in mycallback() being called | |
Just dumped the screen. | |
.. note:: In testing Gate One it was determined that it is faster to perform the conversion of a terminal screen to HTML on the server side than it is on the client side (via JavaScript anyway). | |
About The Scrollback Bufffer | |
---------------------------- | |
The Terminal class implements a scrollback buffer. Here's how it works: | |
Whenever a :meth:`Terminal.scroll_up` event occurs, the line (or lines) that | |
will be removed from the top of the screen will be placed into | |
:attr:`Terminal.scrollback_buf`. Then whenever :meth:`Terminal.dump_html` is | |
called the scrollback buffer will be returned along with the screen output and | |
reset to an empty state. | |
Why do this? In the event that a very large :meth:`Terminal.write` occurs (e.g. | |
'ps aux'), it gives the controlling program the ability to capture what went | |
past the screen without some fancy tracking logic surrounding | |
:meth:`Terminal.write`. | |
More information about how this works can be had by looking at the | |
:meth:`Terminal.dump_html` function itself. | |
.. note:: There's more than one function that empties :attr:`Terminal.scrollback_buf` when called. You'll just have to have a look around =) | |
Class Docstrings | |
================ | |
""" | |
# Import stdlib stuff | |
import os, sys, re, logging, base64, StringIO, codecs, unicodedata, tempfile | |
from array import array | |
from datetime import datetime, timedelta | |
from functools import partial | |
from collections import defaultdict | |
try: | |
from collections import OrderedDict | |
except ImportError: # Python <2.7 didn't have OrderedDict in collections | |
try: | |
from ordereddict import OrderedDict | |
except ImportError: | |
logging.error( | |
"Error: Could not import OrderedDict. Please install it:") | |
logging.error("\tsudo pip install ordereddict") | |
logging.error( | |
"...or download it from http://pypi.python.org/pypi/ordereddict") | |
sys.exit(1) | |
from itertools import imap, izip | |
# Inernationalization support | |
import gettext | |
gettext.install('terminal') | |
# Import 3rd party stuff | |
try: | |
# We need PIL to detect image types and get their dimensions. Without the | |
# dimenions, the browser will render the terminal screen much slower than | |
# normal. Without PIL images will be displayed simply as: | |
# <i>Image file</i> | |
from PIL import Image | |
except ImportError: | |
Image = None | |
logging.warning(_( | |
"Could not import the Python Imaging Library (PIL). " | |
"Images will not be displayed in the terminal.")) | |
logging.info(_( | |
"TIP: Pillow is a 'friendly fork' of PIL that has been updated to work " | |
"with Python 3 (also works in Python 2.X). You can install it with " | |
"'pip install --upgrade Pillow'.")) | |
# Globals | |
CALLBACK_SCROLL_UP = 1 # Called after a scroll up event (new line) | |
CALLBACK_CHANGED = 2 # Called after the screen is updated | |
CALLBACK_CURSOR_POS = 3 # Called after the cursor position is updated | |
# <waives hand in air> You are not concerned with the number 4 | |
CALLBACK_DSR = 5 # Called when a DSR requires a response | |
# NOTE: CALLBACK_DSR must accept 'response' as either the first argument or | |
# as a keyword argument. | |
CALLBACK_TITLE = 6 # Called when the terminal sets the window title | |
CALLBACK_BELL = 7 # Called after ASCII_BEL is encountered. | |
CALLBACK_OPT = 8 # Called when we encounter the optional ESC sequence | |
# NOTE: CALLBACK_OPT must accept 'chars' as either the first argument or as | |
# a keyword argument. | |
CALLBACK_MODE = 9 # Called when the terminal mode changes (e.g. DECCKM) | |
CALLBACK_RESET = 10 # Called when a terminal reset (^[[!p) is encountered | |
CALLBACK_LEDS = 11 # Called when the state of the LEDs changes | |
# Called when the terminal emulator encounters a situation where it wants to | |
# tell the user about something (say, an error decoding an image) without | |
# interfering with the terminal's screen. | |
CALLBACK_MESSAGE = 12 | |
# These are for HTML output: | |
RENDITION_CLASSES = defaultdict(lambda: None, { | |
0: 'reset', # Special: Return everything to defaults | |
1: 'bold', | |
2: 'dim', | |
3: 'italic', | |
4: 'underline', | |
5: 'blink', | |
6: 'fastblink', | |
7: 'reverse', | |
8: 'hidden', | |
9: 'strike', | |
10: 'resetfont', # NOTE: The font renditions don't do anything right now | |
11: 'font11', # Mostly because I have no idea what they are supposed to look | |
12: 'font12', # like. | |
13: 'font13', | |
14: 'font14', | |
15: 'font15', | |
16: 'font16', | |
17: 'font17', | |
18: 'font18', | |
19: 'font19', | |
20: 'fraktur', | |
21: 'boldreset', | |
22: 'dimreset', | |
23: 'italicreset', | |
24: 'underlinereset', | |
27: 'reversereset', | |
28: 'hiddenreset', | |
29: 'strikereset', | |
# Foregrounds | |
30: 'f0', # Black | |
31: 'f1', # Red | |
32: 'f2', # Green | |
33: 'f3', # Yellow | |
34: 'f4', # Blue | |
35: 'f5', # Magenta | |
36: 'f6', # Cyan | |
37: 'f7', # White | |
38: '', # 256-color support uses this like so: \x1b[38;5;<color num>sm | |
# Backgrounds | |
40: 'b0', # Black | |
41: 'b1', # Red | |
42: 'b2', # Green | |
43: 'b3', # Yellow | |
44: 'b4', # Blue | |
45: 'b5', # Magenta | |
46: 'b6', # Cyan | |
47: 'b7', # White | |
48: '', # 256-color support uses this like so: \x1b[48;5;<color num>sm | |
49: 'backgroundreset', # Special: Set BG to default | |
51: 'frame', | |
52: 'encircle', | |
53: 'overline', | |
60: 'rightline', | |
61: 'rightdoubleline', | |
62: 'leftline', | |
63: 'leftdoubleline', | |
# aixterm colors (aka '16 color support'). They're supposed to be 'bright' | |
# versions of the first 8 colors (hence the 'b'). | |
# 'Bright' Foregrounds | |
90: 'bf0', # Bright black (whatever that is =) | |
91: 'bf1', # Bright red | |
92: 'bf2', # Bright green | |
93: 'bf3', # Bright yellow | |
94: 'bf4', # Bright blue | |
95: 'bf5', # Bright magenta | |
96: 'bf6', # Bright cyan | |
97: 'bf7', # Bright white | |
# TODO: Handle the ESC sequence that sets the colors from 90-87 (e.g. ESC]91;orange/brown^G) | |
# 'Bright' Backgrounds | |
100: 'bb0', # Bright black | |
101: 'bb1', # Bright red | |
102: 'bb2', # Bright green | |
103: 'bb3', # Bright yellow | |
104: 'bb4', # Bright blue | |
105: 'bb5', # Bright magenta | |
106: 'bb6', # Bright cyan | |
107: 'bb7' # Bright white | |
}) | |
# Generate the dict of 256-color (xterm) foregrounds and backgrounds | |
for i in xrange(256): | |
RENDITION_CLASSES[(i+1000)] = "fx%s" % i | |
RENDITION_CLASSES[(i+10000)] = "bx%s" % i | |
del i # Cleanup | |
try: | |
unichr(0x10000) # Will throw a ValueError on narrow Python builds | |
SPECIAL = 1048576 # U+100000 or unichr(SPECIAL) (start of Plane 16) | |
except: | |
SPECIAL = 63561 | |
def handle_special(e): | |
""" | |
Used in conjunction with :py:func:`codecs.register_error`, will replace | |
special ascii characters such as 0xDA and 0xc4 (which are used by ncurses) | |
with their Unicode equivalents. | |
""" | |
# TODO: Get this using curses special characters when appropriate | |
#curses_specials = { | |
## NOTE: When $TERM is set to "Linux" these end up getting used by things | |
## like ncurses-based apps. In other words, it makes a whole lot | |
## of ugly look pretty again. | |
#0xda: u'┌', # ACS_ULCORNER | |
#0xc0: u'└', # ACS_LLCORNER | |
#0xbf: u'┐', # ACS_URCORNER | |
#0xd9: u'┘', # ACS_LRCORNER | |
#0xb4: u'├', # ACS_RTEE | |
#0xc3: u'┤', # ACS_LTEE | |
#0xc1: u'┴', # ACS_BTEE | |
#0xc2: u'┬', # ACS_TTEE | |
#0xc4: u'─', # ACS_HLINE | |
#0xb3: u'│', # ACS_VLINE | |
#0xc5: u'┼', # ACS_PLUS | |
#0x2d: u'', # ACS_S1 | |
#0x5f: u'', # ACS_S9 | |
#0x60: u'◆', # ACS_DIAMOND | |
#0xb2: u'▒', # ACS_CKBOARD | |
#0xf8: u'°', # ACS_DEGREE | |
#0xf1: u'±', # ACS_PLMINUS | |
#0xf9: u'•', # ACS_BULLET | |
#0x3c: u'←', # ACS_LARROW | |
#0x3e: u'→', # ACS_RARROW | |
#0x76: u'↓', # ACS_DARROW | |
#0x5e: u'↑', # ACS_UARROW | |
#0xb0: u'⊞', # ACS_BOARD | |
#0x0f: u'⨂', # ACS_LANTERN | |
#0xdb: u'█', # ACS_BLOCK | |
#} | |
specials = { | |
# Note to self: Why did I bother with these overly descriptive comments? Ugh | |
# I've been staring at obscure symbols far too much lately ⨀_⨀ | |
128: u'€', # Euro sign | |
129: u' ', # Unknown (Using non-breaking spaces for all unknowns) | |
130: u'‚', # Single low-9 quotation mark | |
131: u'ƒ', # Latin small letter f with hook | |
132: u'„', # Double low-9 quotation mark | |
133: u'…', # Horizontal ellipsis | |
134: u'†', # Dagger | |
135: u'‡', # Double dagger | |
136: u'ˆ', # Modifier letter circumflex accent | |
137: u'‰', # Per mille sign | |
138: u'Š', # Latin capital letter S with caron | |
139: u'‹', # Single left-pointing angle quotation | |
140: u'Œ', # Latin capital ligature OE | |
141: u' ', # Unknown | |
142: u'Ž', # Latin captial letter Z with caron | |
143: u' ', # Unknown | |
144: u' ', # Unknown | |
145: u'‘', # Left single quotation mark | |
146: u'’', # Right single quotation mark | |
147: u'“', # Left double quotation mark | |
148: u'”', # Right double quotation mark | |
149: u'•', # Bullet | |
150: u'–', # En dash | |
151: u'—', # Em dash | |
152: u'˜', # Small tilde | |
153: u'™', # Trade mark sign | |
154: u'š', # Latin small letter S with caron | |
155: u'›', # Single right-pointing angle quotation mark | |
156: u'œ', # Latin small ligature oe | |
157: u'Ø', # Upper-case slashed zero--using same as empty set (216) | |
158: u'ž', # Latin small letter z with caron | |
159: u'Ÿ', # Latin capital letter Y with diaeresis | |
160: u' ', # Non-breaking space | |
161: u'¡', # Inverted exclamation mark | |
162: u'¢', # Cent sign | |
163: u'£', # Pound sign | |
164: u'¤', # Currency sign | |
165: u'¥', # Yen sign | |
166: u'¦', # Pipe, Broken vertical bar | |
167: u'§', # Section sign | |
168: u'¨', # Spacing diaeresis - umlaut | |
169: u'©', # Copyright sign | |
170: u'ª', # Feminine ordinal indicator | |
171: u'«', # Left double angle quotes | |
172: u'¬', # Not sign | |
173: u"\u00AD", # Soft hyphen | |
174: u'®', # Registered trade mark sign | |
175: u'¯', # Spacing macron - overline | |
176: u'°', # Degree sign | |
177: u'±', # Plus-or-minus sign | |
178: u'²', # Superscript two - squared | |
179: u'³', # Superscript three - cubed | |
180: u'´', # Acute accent - spacing acute | |
181: u'µ', # Micro sign | |
182: u'¶', # Pilcrow sign - paragraph sign | |
183: u'·', # Middle dot - Georgian comma | |
184: u'¸', # Spacing cedilla | |
185: u'¹', # Superscript one | |
186: u'º', # Masculine ordinal indicator | |
187: u'»', # Right double angle quotes | |
188: u'¼', # Fraction one quarter | |
189: u'½', # Fraction one half | |
190: u'¾', # Fraction three quarters | |
191: u'¿', # Inverted question mark | |
192: u'À', # Latin capital letter A with grave | |
193: u'Á', # Latin capital letter A with acute | |
194: u'Â', # Latin capital letter A with circumflex | |
195: u'Ã', # Latin capital letter A with tilde | |
196: u'Ä', # Latin capital letter A with diaeresis | |
197: u'Å', # Latin capital letter A with ring above | |
198: u'Æ', # Latin capital letter AE | |
199: u'Ç', # Latin capital letter C with cedilla | |
200: u'È', # Latin capital letter E with grave | |
201: u'É', # Latin capital letter E with acute | |
202: u'Ê', # Latin capital letter E with circumflex | |
203: u'Ë', # Latin capital letter E with diaeresis | |
204: u'Ì', # Latin capital letter I with grave | |
205: u'Í', # Latin capital letter I with acute | |
206: u'Î', # Latin capital letter I with circumflex | |
207: u'Ï', # Latin capital letter I with diaeresis | |
208: u'Ð', # Latin capital letter ETH | |
209: u'Ñ', # Latin capital letter N with tilde | |
210: u'Ò', # Latin capital letter O with grave | |
211: u'Ó', # Latin capital letter O with acute | |
212: u'Ô', # Latin capital letter O with circumflex | |
213: u'Õ', # Latin capital letter O with tilde | |
214: u'Ö', # Latin capital letter O with diaeresis | |
215: u'×', # Multiplication sign | |
216: u'Ø', # Latin capital letter O with slash (aka "empty set") | |
217: u'Ù', # Latin capital letter U with grave | |
218: u'Ú', # Latin capital letter U with acute | |
219: u'Û', # Latin capital letter U with circumflex | |
220: u'Ü', # Latin capital letter U with diaeresis | |
221: u'Ý', # Latin capital letter Y with acute | |
222: u'Þ', # Latin capital letter THORN | |
223: u'ß', # Latin small letter sharp s - ess-zed | |
224: u'à', # Latin small letter a with grave | |
225: u'á', # Latin small letter a with acute | |
226: u'â', # Latin small letter a with circumflex | |
227: u'ã', # Latin small letter a with tilde | |
228: u'ä', # Latin small letter a with diaeresis | |
229: u'å', # Latin small letter a with ring above | |
230: u'æ', # Latin small letter ae | |
231: u'ç', # Latin small letter c with cedilla | |
232: u'è', # Latin small letter e with grave | |
233: u'é', # Latin small letter e with acute | |
234: u'ê', # Latin small letter e with circumflex | |
235: u'ë', # Latin small letter e with diaeresis | |
236: u'ì', # Latin small letter i with grave | |
237: u'í', # Latin small letter i with acute | |
238: u'î', # Latin small letter i with circumflex | |
239: u'ï', # Latin small letter i with diaeresis | |
240: u'ð', # Latin small letter eth | |
241: u'ñ', # Latin small letter n with tilde | |
242: u'ò', # Latin small letter o with grave | |
243: u'ó', # Latin small letter o with acute | |
244: u'ô', # Latin small letter o with circumflex | |
245: u'õ', # Latin small letter o with tilde | |
246: u'ö', # Latin small letter o with diaeresis | |
247: u'÷', # Division sign | |
248: u'ø', # Latin small letter o with slash | |
249: u'ù', # Latin small letter u with grave | |
250: u'ú', # Latin small letter u with acute | |
251: u'û', # Latin small letter u with circumflex | |
252: u'ü', # Latin small letter u with diaeresis | |
253: u'ý', # Latin small letter y with acute | |
254: u'þ', # Latin small letter thorn | |
255: u'ÿ', # Latin small letter y with diaeresis | |
} | |
# I left this in its odd state so I could differentiate between the two | |
# in the future. | |
chars = e.object | |
if bytes == str: # Python 2 | |
# Convert e.object to a bytearray for an easy switch to integers. | |
# It is quicker than calling ord(char) on each char in e.object | |
chars = bytearray(e.object) | |
# NOTE: In Python 3 when you iterate over bytes they appear as integers. | |
# So we don't need to convert to a bytearray in Python 3. | |
if isinstance(e, (UnicodeEncodeError, UnicodeTranslateError)): | |
s = [u'%s' % specials[c] for c in chars[e.start:e.end]] | |
return ''.join(s), e.end | |
else: | |
s = [u'%s' % specials[c] for c in chars[e.start:e.end]] | |
return ''.join(s), e.end | |
codecs.register_error('handle_special', handle_special) | |
# TODO List: | |
# | |
# * We need unit tests! | |
# * Add a function that can dump the screen with text renditions represented as their usual escape sequences so applications that try to perform screen-scraping can match things like '\x1b[41mAuthentication configuration' without having to find specific character positions and then examining the renditions on that line. | |
# Helper functions | |
def _reduce_renditions(renditions): | |
""" | |
Takes a list, *renditions*, and reduces it to its logical equivalent (as | |
far as renditions go). Example:: | |
[0, 32, 0, 34, 0, 32] | |
Would become:: | |
[0, 32] | |
Other Examples:: | |
[0, 1, 36, 36] -> [0, 1, 36] | |
[0, 30, 42, 30, 42] -> [0, 30, 42] | |
[36, 32, 44, 42] -> [32, 42] | |
[36, 35] -> [35] | |
""" | |
out_renditions = [] | |
foreground = None | |
background = None | |
for rend in renditions: | |
if rend < 29: | |
if rend not in out_renditions: | |
out_renditions.append(rend) | |
elif rend > 29 and rend < 40: | |
# Regular 8-color foregrounds | |
foreground = rend | |
elif rend > 39 and rend < 50: | |
# Regular 8-color backgrounds | |
background = rend | |
elif rend > 91 and rend < 98: | |
# 'Bright' (16-color) foregrounds | |
foreground = rend | |
elif rend > 99 and rend < 108: | |
# 'Bright' (16-color) backgrounds | |
background = rend | |
elif rend > 1000 and rend < 10000: | |
# 256-color foregrounds | |
foreground = rend | |
elif rend > 10000 and rend < 20000: | |
# 256-color backgrounds | |
background = rend | |
else: | |
out_renditions.append(rend) | |
if foreground: | |
out_renditions.append(foreground) | |
if background: | |
out_renditions.append(background) | |
return out_renditions | |
def unicode_counter(): | |
""" | |
A generator that returns incrementing Unicode characters that can be used as | |
references inside a Unicode array. For example:: | |
>>> counter = unicode_counter() | |
>>> mapping_dict = {} | |
>>> some_array = array('u') | |
>>> # Pretend 'marked ...' below is a reference to something impotant :) | |
>>> for i, c in enumerate(u'some string'): | |
if c == u' ': # Mark the location of spaces | |
# Perform some operation where we need to save a value | |
result = some_evaluation(i, c) | |
# Save some memory by storing a reference to result instead | |
# of the same result over and over again | |
if result not in mapping_dict.values(): | |
marker = counter.next() | |
some_array.append(marker) | |
mapping_dict[marker] = result | |
else: # Find the existing reference so we can use it again | |
for k, v in mapping_dict.items(): | |
if v == result: # Use the existing value | |
some_array.append(k) | |
else: | |
some_array.append(\x00) # \x00 == "not interesting" placeholder | |
>>> | |
Now we could iterate over 'some string' and some_array simultaneously using | |
zip(u'some string', some_array) to access those reference markers when we | |
encountered the correct position. This can save a lot of memory if you need | |
to store objects in memory that have a tendancy to repeat (e.g. text | |
rendition lists in a terminal). | |
.. note:: Meant to be used inside the renditions array to reference text rendition lists such as `[0, 1, 34]`. | |
""" | |
n = 1000 # Start at 1000 so we can use lower characters for other things | |
while True: | |
yield unichr(n) | |
if n == 65535: # The end of unicode in narrow builds of Python | |
n = 0 # Reset | |
else: | |
n += 1 | |
# NOTE: Why use a unicode array() to store references instead of just a regular array()? Two reasons: 1) Large namespace. 2) Only need to use one kind of array for everything (convenience). It is also a large memory savings over "just using a list with references to items in a dict." | |
def pua_counter(): | |
""" | |
A generator that returns a Unicode Private Use Area (PUA) character starting | |
at the beginning of Plane 16 (U+100000); counting up by one with each | |
successive call. If this is a narrow Python build the tail end of Plane 15 | |
will be used as a fallback (with a lot less characters). | |
.. note:: Meant to be used as references to non-text objects in the screen array() (since it can only contain unicode characters) | |
""" | |
if SPECIAL == 1048576: # Not a narrow build of Python | |
n = SPECIAL # U+100000 or unichr(SPECIAL) (start of Plane 16) | |
while True: | |
yield unichr(n) | |
if n == 1114111: | |
n = SPECIAL # Reset--would be impressive to make it this far! | |
else: | |
n += 1 | |
else: | |
# This Python build is 'narrow' so we have to settle for less | |
# Hopefully no real-world terminal will actually want to use one of | |
# these characters. In my research I couldn't find a font that used | |
# them. Please correct me if I'm wrong! | |
n = SPECIAL # u'\uf849' | |
while True: | |
yield unichr(n) | |
if n == 63717: # The end of nothing-but-block-chars in Plane 15 | |
n = SPECIAL # Reset | |
else: | |
n += 1 | |
# Classes | |
class FileType(object): | |
""" | |
An object to hold the attributes of a supported file capture/output type. | |
""" | |
# These attributes are here to prevent AttributeErrors if not overridden | |
thumbnail = None | |
html_template = "" # Must be overridden | |
html_icon_template = "" # Must be overridden | |
def __init__(self, | |
name, mimetype, re_header, re_capture, suffix="", path="", linkpath="", icondir=None): | |
""" | |
**name:** Name of the file type. | |
**mimetype:** Mime type of the file. | |
**re_header:** The regex to match the start of the file. | |
**re_capture:** The regex to carve the file out of the stream. | |
**suffix:** (optional) The suffix to be appended to the end of the filename (if one is generated). | |
**path:** (optional) The path to a file or directory where the file should be stored. If *path* is a directory a random filename will be chosen. | |
**linkpath:** (optional) The path to use when generating a link in HTML output. | |
**icondir:** (optional) A path to look for a relevant icon to display when generating HTML output. | |
""" | |
self.name = name | |
self.mimetype = mimetype | |
self.re_header = re_header | |
self.re_capture = re_capture | |
self.suffix = suffix | |
# A path just in case something needs to access it outside of Python: | |
self.path = path | |
self.linkpath = linkpath | |
self.icondir = icondir | |
self.file_obj = None | |
def __repr__(self): | |
return "<%s>" % self.name | |
def __str__(self): | |
"Override if the defined file type warrants a text-based output." | |
return self.__repr__() | |
def __del__(self): | |
""" | |
Make sure that self.file_obj gets closed/deleted. | |
""" | |
logging.debug("FileType __del__(): Closing/deleting temp file") | |
self.file_obj.close() # Ensures it gets deleted | |
def raw(self): | |
self.file_obj.seek(0) | |
data = open(self.file_obj).read() | |
self.file_obj.seek(0) | |
return data | |
def html(self): | |
""" | |
Returns the object as an HTML-formatted string. Must be overridden. | |
""" | |
raise NotImplementedError | |
def capture(self, data, term_instance=None): | |
""" | |
Stores *data* as a temporary file and returns that file's object. | |
*term_instance* can be used by overrides of this function to make | |
adjustments to the terminal emulator after the *data* is captured e.g. | |
to make room for an image. | |
""" | |
# Remove the extra \r's that the terminal adds: | |
data = str(data).replace('\r\n', '\n') | |
logging.debug("capture() len(data): %s" % len(data)) | |
# Write the data to disk in a temporary location | |
self.file_obj = tempfile.TemporaryFile() | |
self.file_obj.write(data) | |
self.file_obj.flush() | |
# Leave it open | |
return self.file_obj | |
def close(self): | |
""" | |
Closes :attr:`self.file_obj` | |
""" | |
self.file_obj.close() | |
class ImageFile(FileType): | |
""" | |
A subclass of :class:`FileType` for images (specifically to override | |
:meth:`self.html` and :meth:`self.capture`). | |
""" | |
def capture(self, data, term_instance): | |
""" | |
Captures the image contained within *data*. Will use *term_instance* | |
to make room for the image in the terminal screen. | |
.. note:: Unlike :class:`FileType`, *term_instance* is mandatory. | |
""" | |
logging.debug('ImageFile.capture()') | |
# Image file formats don't usually like carriage returns: | |
data = str(data).replace('\r\n', '\n') | |
if Image: # PIL is loaded--try to guess how many lines the image takes | |
i = StringIO.StringIO(data) | |
try: | |
im = Image.open(i) | |
except IOError: | |
# i.e. PIL couldn't identify the file | |
logging.error(_("PIL couldn't process the image")) | |
return # Don't do anything--bad image | |
else: # No PIL means no images. Don't bother wasting memory. | |
return | |
# Resize the image to be small enough to fit within a typical terminal | |
if im.size[0] > 640 or im.size[1] > 480: | |
im.thumbnail((640, 480), Image.ANTIALIAS) | |
# Get the current image location and reference so we can move it around | |
img_Y = term_instance.cursorY | |
img_X = term_instance.cursorX | |
ref = term_instance.screen[img_Y][img_X] | |
if term_instance.em_dimensions: | |
# Make sure the image will fit properly in the screen | |
width = im.size[0] | |
height = im.size[1] | |
if height <= term_instance.em_dimensions['height']: | |
# Fits within a line. No need for a newline | |
num_chars = int(width/term_instance.em_dimensions['width']) | |
# Move the cursor an equivalent number of characters | |
term_instance.cursor_right(num_chars) | |
else: | |
# This is how many newlines the image represents: | |
newlines = int(height/term_instance.em_dimensions['height']) | |
term_instance.screen[img_Y][img_X] = u' ' # Empty old location | |
term_instance.cursorX = 0 | |
term_instance.newline() # Start with a newline | |
if newlines > term_instance.cursorY: | |
for line in xrange(newlines): | |
term_instance.newline() | |
# Save the new image location | |
term_instance.screen[ | |
term_instance.cursorY][term_instance.cursorX] = ref | |
term_instance.newline() | |
else: | |
# No way to calculate the number of lines the image will take | |
term_instance.screen[img_Y][img_X] = u' ' # Empty old location | |
term_instance.cursorY = term_instance.rows - 1 # Move to the end | |
# ... so it doesn't get cut off at the top | |
# Save the new image location | |
term_instance.screen[ | |
term_instance.cursorY][term_instance.cursorX] = ref | |
# Make some space at the bottom too just in case | |
term_instance.newline() | |
term_instance.newline() | |
# Write the captured image to disk | |
if self.path: | |
if os.path.exists(self.path): | |
if os.path.isdir(self.path): | |
# Assume that a path was given for a reason and use a | |
# NamedTemporaryFile instead of TemporaryFile. | |
self.file_obj = tempfile.NamedTemporaryFile( | |
suffix=self.suffix, dir=self.path) | |
# Update self.path to use the new, actual file path | |
self.path = self.file_obj.name | |
else: | |
self.file_obj = open(self.path, 'rb+') | |
else: | |
self.file_obj = tempfile.TemporaryFile() | |
try: | |
im.save(self.file_obj, im.format) | |
except IOError: | |
# PIL was compiled without (complete) support for this format | |
logging.error(_( | |
"PIL is missing support for this image type (%s). You probably" | |
" need to install zlib-devel and libjpeg-devel then re-install " | |
"it with 'pip install --upgrade PIL' or 'pip install " | |
"--upgrade Pillow'" % self.name)) | |
self.file_obj.close() # Can't do anything with it | |
return None | |
self.file_obj.flush() | |
self.file_obj.seek(0) # Go back to the start | |
return self.file_obj | |
def html(self): | |
""" | |
Returns :attr:`self.file_obj` as an <img> tag with the src set to a | |
data::URI. | |
""" | |
if not self.file_obj: | |
return u"" | |
self.file_obj.seek(0) | |
try: | |
im = Image.open(self.file_obj) | |
except IOError: | |
# i.e. PIL couldn't identify the file | |
return u"<i>Error displaying image</i>" | |
self.file_obj.seek(0) | |
# Need to encode base64 to create a data URI | |
encoded = base64.b64encode(self.file_obj.read()) | |
data_uri = "data:image/%s;base64,%s" % ( | |
im.format.lower(), encoded) | |
if self.thumbnail: | |
return self.html_icon_template.format( | |
src=data_uri, width=im.size[0], height=im.size[1]) | |
return self.html_template.format( | |
src=data_uri, width=im.size[0], height=im.size[1]) | |
class PNGFile(ImageFile): | |
""" | |
An override of :class:`ImageFile` for PNGs to hard-code the name, regular | |
expressions, mimetype, and suffix. | |
""" | |
name = _("PNG Image") | |
mimetype = "image/png" | |
suffix = ".png" | |
re_header = re.compile('.*\x89PNG\r', re.DOTALL) | |
re_capture = re.compile('(\x89PNG\r.+IEND\xaeB`\x82)', re.DOTALL) | |
html_template = '<img src="{src}" width="{width}" height="{height}">' | |
def __init__(self, path="", **kwargs): | |
""" | |
**path:** (optional) The path to a file or directory where the file should be stored. If *path* is a directory a random filename will be chosen. | |
""" | |
self.path = path | |
self.file_obj = None | |
# Images will be displayed inline so no icons unless overridden: | |
self.html_icon_template = self.html_template | |
class JPEGFile(ImageFile): | |
""" | |
An override of :class:`ImageFile` for JPEGs to hard-code the name, regular | |
expressions, mimetype, and suffix. | |
""" | |
name = _("JPEG Image") | |
mimetype = "image/jpeg" | |
suffix = ".jpeg" | |
re_header = re.compile( | |
'.*\xff\xd8\xff.+JFIF\x00|.*\xff\xd8\xff.+Exif\x00', re.DOTALL) | |
re_capture = re.compile( | |
'(\xff\xd8\xff.+\xff\xd9)', re.DOTALL | |
) | |
html_template = '<img src="{src}" width="{width}" height="{height}">' | |
def __init__(self, path="", **kwargs): | |
""" | |
**path:** (optional) The path to a file or directory where the file should be stored. If *path* is a directory a random filename will be chosen. | |
""" | |
self.path = path | |
self.file_obj = None | |
# Images will be displayed inline so no icons unless overridden: | |
self.html_icon_template = self.html_template | |
class PDFFile(FileType): | |
""" | |
A subclass of :class:`FileType` for PDFs (specifically to override | |
:meth:`self.html`). Has hard-coded name, mimetype, suffix, and regular | |
expressions. This class will also utilize :attr:`self.icondir` to look for | |
an icon named, 'pdf.svg'. If found it will be utilized by | |
:meth:`self.html` when generating output. | |
""" | |
name = _("PDF Document") | |
mimetype = "application/pdf" | |
suffix = ".pdf" | |
re_header = re.compile(r'.*%PDF-[0-9]\.[0-9]{1,2}.+?obj', re.DOTALL) | |
re_capture = re.compile(r'(%PDF-[0-9]\.[0-9]{1,2}.+%%EOF)', re.DOTALL) | |
icon = "pdf.svg" # Name of the file inside of self.icondir | |
# NOTE: Using two separate links below so the whitespace doesn't end up | |
# underlined. Looks much nicer this way. | |
html_icon_template = ( | |
'<span class="pdfcontainer"><a class="pdflink" target="_blank" ' | |
'href="{link}">{icon}</a><br>' | |
' <a class="pdflink" href="{link}">{name}</a></span>') | |
html_template = ( | |
'<span class="pdfcontainer"><a target="_blank" href="{link}">{name}</a>' | |
'</span>') | |
def __init__(self, path="", linkpath="", icondir=None): | |
""" | |
**path:** (optional) The path to the file. | |
**linkpath:** (optional) The path to use when generating a link in HTML output. | |
**icondir:** (optional) A path to look for a relevant icon to display when generating HTML output. | |
""" | |
self.path = path | |
self.linkpath = linkpath | |
self.icondir = icondir | |
self.file_obj = None | |
self.thumbnail = None | |
def generate_thumbnail(self): | |
""" | |
If available, will use ghostscript (gs) to generate a thumbnail of this | |
PDF in the form of an <img> tag with the src set to a data::URI. | |
""" | |
from commands import getstatusoutput | |
thumb = tempfile.NamedTemporaryFile() | |
params = [ | |
'gs', # gs must be in your path | |
'-dPDFFitPage', | |
'-dPARANOIDSAFER', | |
'-dBATCH', | |
'-dNOPAUSE', | |
'-dNOPROMPT', | |
'-dMaxBitmap=500000000', | |
'-dAlignToPixels=0', | |
'-dGridFitTT=0', | |
'-dDEVICEWIDTH=90', | |
'-dDEVICEHEIGHT=120', | |
'-dORIENT1=true', | |
'-sDEVICE=jpeg', | |
'-dTextAlphaBits=4', | |
'-dGraphicsAlphaBits=4', | |
'-sOutputFile=%s' % thumb.name, | |
self.path | |
] | |
retcode, output = getstatusoutput(" ".join(params)) | |
if retcode == 0: | |
# Success | |
data = None | |
with open(thumb.name) as f: | |
data = f.read() | |
thumb.close() # Make sure it gets removed now we've read it | |
if data: | |
encoded = base64.b64encode(data) | |
data_uri = "data:image/jpeg;base64,%s" % encoded | |
return '<img src="%s">' % data_uri | |
def capture(self, data, term_instance): | |
""" | |
Stores *data* as a temporary file and returns that file's object. | |
*term_instance* can be used by overrides of this function to make | |
adjustments to the terminal emulator after the *data* is captured e.g. | |
to make room for an image. | |
""" | |
logging.debug("PDFFile.capture()") | |
# Remove the extra \r's that the terminal adds: | |
data = str(data).replace('\r\n', '\n') | |
# Write the data to disk in a temporary location | |
if self.path: | |
if os.path.exists(self.path): | |
if os.path.isdir(self.path): | |
# Assume that a path was given for a reason and use a | |
# NamedTemporaryFile instead of TemporaryFile. | |
self.file_obj = tempfile.NamedTemporaryFile( | |
suffix=self.suffix, dir=self.path) | |
# Update self.path to use the new, actual file path | |
self.path = self.file_obj.name | |
else: | |
self.file_obj = open(self.path, 'rb+') | |
else: | |
# Use the terminal emulator's temppath | |
self.file_obj = tempfile.NamedTemporaryFile( | |
suffix=self.suffix, dir=term_instance.temppath) | |
self.path = self.file_obj.name | |
self.file_obj.write(data) | |
self.file_obj.flush() | |
# Ghostscript-based thumbnail generation disabled due to its slow, | |
# blocking nature. Works great though! | |
#self.thumbnail = self.generate_thumbnail() | |
# TODO: Figure out a way to do non-blocking thumbnail generation | |
if self.icondir: | |
pdf_icon = os.path.join(self.icondir, self.icon) | |
if os.path.exists(pdf_icon): | |
with open(pdf_icon) as f: | |
self.thumbnail = f.read() | |
if self.thumbnail: | |
# Make room for our link | |
img_Y = term_instance.cursorY | |
img_X = term_instance.cursorX | |
ref = term_instance.screen[img_Y][img_X] | |
term_instance.screen[img_Y][img_X] = u' ' # No longer at this loc | |
if term_instance.cursorY < 8: # Icons are about ~8 newlines high | |
for line in xrange(8 - term_instance.cursorY): | |
term_instance.newline() | |
# Save the new location | |
term_instance.screen[ | |
term_instance.cursorY][term_instance.cursorX] = ref | |
term_instance.newline() | |
else: | |
# Make room for the characters in the name, "PDF Document" | |
for i in xrange(len(self.name)): | |
term_instance.screen[term_instance.cursorY].pop() | |
# Leave it open | |
return self.file_obj | |
def html(self): | |
""" | |
Returns a link to download the PDF using :attr:`self.linkpath` for the | |
href attribute. Will use :attr:`self.html_icon_template` if | |
:attr:`self.icon` can be found. Otherwise it will just output | |
:attr:`self.name` as a clickable link. | |
""" | |
link = "%s/%s" % (self.linkpath, os.path.split(self.path)[1]) | |
if self.thumbnail: | |
return self.html_icon_template.format( | |
link=link, icon=self.thumbnail, name=self.name) | |
return self.html_template.format( | |
link=link, icon=self.thumbnail, name=self.name) | |
class NotFoundError(Exception): | |
""" | |
Raised by :meth:`Terminal.remove_magic` if a given filetype was not found in | |
:attr:`Terminal.supported_magic`. | |
""" | |
pass | |
class Terminal(object): | |
""" | |
Terminal controller class. | |
""" | |
ASCII_NUL = 0 # Null | |
ASCII_BEL = 7 # Bell (BEL) | |
ASCII_BS = 8 # Backspace | |
ASCII_HT = 9 # Horizontal Tab | |
ASCII_LF = 10 # Line Feed | |
ASCII_VT = 11 # Vertical Tab | |
ASCII_FF = 12 # Form Feed | |
ASCII_CR = 13 # Carriage Return | |
ASCII_SO = 14 # Ctrl-N; Shift out (switches to the G0 charset) | |
ASCII_SI = 15 # Ctrl-O; Shift in (switches to the G1 charset) | |
ASCII_XON = 17 # Resume Transmission | |
ASCII_XOFF = 19 # Stop Transmission or Ignore Characters | |
ASCII_CAN = 24 # Cancel Escape Sequence | |
ASCII_SUB = 26 # Substitute: Cancel Escape Sequence and replace with ? | |
ASCII_ESC = 27 # Escape | |
ASCII_CSI = 155 # Control Sequence Introducer (that nothing uses) | |
ASCII_HTS = 210 # Horizontal Tab Stop (HTS) | |
charsets = { | |
'B': {}, # Default is USA (aka 'B') | |
'0': { # Line drawing mode | |
95: u' ', | |
96: u'◆', | |
97: u'▒', | |
98: u'\t', | |
99: u'\x0c', | |
100: u'\r', | |
101: u'\n', | |
102: u'°', | |
103: u'±', | |
104: u'\n', | |
105: u'\x0b', | |
106: u'┘', | |
107: u'┐', | |
108: u'┌', | |
109: u'└', | |
110: u'┼', | |
111: u'⎺', # All these bars and not a drink! | |
112: u'⎻', | |
113: u'─', | |
114: u'⎼', | |
115: u'⎽', | |
116: u'├', | |
117: u'┤', | |
118: u'┴', | |
119: u'┬', | |
120: u'│', | |
121: u'≤', | |
122: u'≥', | |
123: u'π', | |
124: u'≠', | |
125: u'£', | |
126: u'·' # Centered dot--who comes up with this stuff?!? | |
} | |
} | |
RE_CSI_ESC_SEQ = re.compile(r'\x1B\[([?A-Za-z0-9>;@:\!]*)([A-Za-z@_])') | |
RE_ESC_SEQ = re.compile(r'\x1b(.*\x1b\\|[ABCDEFGHIJKLMNOQRSTUVWXYZa-z0-9=<>]|[()# %*+].)') | |
RE_TITLE_SEQ = re.compile(r'\x1b\][0-2]\;(.*?)(\x07|\x1b\\)') | |
# The below regex is used to match our optional (non-standard) handler | |
RE_OPT_SEQ = re.compile(r'\x1b\]_\;(.+?)(\x07|\x1b\\)') | |
RE_NUMBERS = re.compile('\d*') # Matches any number | |
RE_SIGINT = re.compile('.*\^C', re.MULTILINE|re.DOTALL) | |
def __init__(self, rows=24, cols=80, em_dimensions=None, temppath='/tmp', | |
linkpath='/tmp', icondir=None, encoding='utf-8', debug=False): | |
""" | |
Initializes the terminal by calling *self.initialize(rows, cols)*. This | |
is so we can have an equivalent function in situations where __init__() | |
gets overridden. | |
If *em_dimensions* are provided they will be used to determine how many | |
lines images will take when they're drawn in the terminal. This is to | |
prevent images that are written to the top of the screen from having | |
their tops cut off. *em_dimensions* should be a dict in the form of:: | |
{'height': <px>, 'width': <px>} | |
The *temppath* will be used to store files that are captured/saved by | |
the terminal emulator. In conjunction with this is the *linkpath* which | |
will be used when creating links to these temporary files. For example, | |
a web-based application may wish to have the terminal emulator store | |
temporary files in /tmp but give clients a completely unrelated URL to | |
retrieve these files (for security or convenience reasons). Here's a | |
real world example of how it works:: | |
>>> term = Terminal(rows=10, cols=40, temppath='/var/tmp', linkpath='/terminal') | |
>>> term.write('About to write a PDF\\n') | |
>>> pdf = open('/path/to/somefile.pdf').read() | |
>>> term.write(pdf) | |
>>> term.dump_html() | |
([u'About to write a PDF ', | |
# <unnecessary lines of whitespace have been removed for this example> | |
u'<a target="_blank" href="/terminal/tmpZoOKVM.pdf">PDF Document</a>']) | |
The PDF file in question will reside in `/var/tmp` but the link was | |
created as `href="/terminal/tmpZoOKVM.pdf"`. As long as your web app | |
knows to look in /var/tmp for incoming '/terminal' requests users should | |
be able to retrieve their documents. | |
http://yourapp.company.com/terminal/tmpZoOKVM.pdf | |
The *icondir* parameter, if given, will be used to provide a relevant | |
icon when outputing a link to a file. When a supported | |
:class:`FileType` is captured the instance will be given the *icondir* | |
as a parameter in a manner similar to this:: | |
filetype_instance = filetype_class(icondir=self.icondir) | |
That way when filetype_instance.html() is called it can display a nice | |
icon to the user... if that particular :class:`FileType` supports icons | |
and the icon it is looking for happens to be available at *icondir*. | |
If *debug* is True, the root logger will have its level set to DEBUG. | |
""" | |
if debug: | |
logger = logging.getLogger() | |
logger.level = logging.DEBUG | |
self.temppath = temppath | |
self.linkpath = linkpath | |
self.icondir = icondir | |
self.encoding = encoding | |
# This controls how often we send a message to the client when capturing | |
# a special file type. The default is to update the user of progress | |
# once every 1.5 seconds. | |
self.message_interval = timedelta(seconds=1.5) | |
self.notified = False # Used to tell if we have notified the user before | |
self.cancel_capture = False | |
# Used by cursor_left() and cursor_right() to handle double-width chars: | |
self.double_width_right = False | |
self.double_width_left = False | |
self.initialize(rows, cols, em_dimensions) | |
def initialize(self, rows=24, cols=80, em_dimensions=None): | |
""" | |
Initializes the terminal (the actual equivalent to :meth:`__init__`). | |
""" | |
self.cols = cols | |
self.rows = rows | |
self.em_dimensions = em_dimensions | |
self.scrollback_buf = [] | |
self.scrollback_renditions = [] | |
self.title = "Gate One" | |
# This variable can be referenced by programs implementing Terminal() to | |
# determine if anything has changed since the last dump*() | |
self.modified = False | |
self.local_echo = True | |
self.insert_mode = False | |
self.esc_buffer = '' # For holding escape sequences as they're typed. | |
self.cursor_home = 0 | |
self.cur_rendition = unichr(1000) # Should always be reset ([0]) | |
self.init_screen() | |
self.init_renditions() | |
self.current_charset = 0 | |
self.set_G0_charset('B') | |
self.set_G1_charset('B') | |
self.use_g0_charset() | |
# Set the default window margins | |
self.top_margin = 0 | |
self.bottom_margin = self.rows - 1 | |
self.timeout_capture = None | |
self.specials = { | |
self.ASCII_NUL: self.__ignore, | |
self.ASCII_BEL: self.bell, | |
self.ASCII_BS: self.backspace, | |
self.ASCII_HT: self.horizontal_tab, | |
self.ASCII_LF: self.newline, | |
self.ASCII_VT: self.newline, | |
self.ASCII_FF: self.newline, | |
self.ASCII_CR: self.carriage_return, | |
self.ASCII_SO: self.use_g1_charset, | |
self.ASCII_SI: self.use_g0_charset, | |
self.ASCII_XON: self._xon, | |
self.ASCII_CAN: self._cancel_esc_sequence, | |
self.ASCII_XOFF: self._xoff, | |
#self.ASCII_ESC: self._sub_esc_sequence, | |
self.ASCII_ESC: self._escape, | |
self.ASCII_CSI: self._csi, | |
} | |
self.esc_handlers = { | |
# TODO: Make a different set of these for each respective emulation mode (VT-52, VT-100, VT-200, etc etc) | |
'#': self._set_line_params, # Varies | |
'\\': self._string_terminator, # ST | |
'c': self.clear_screen, # Reset terminal | |
'D': self.__ignore, # Move/scroll window up one line IND | |
'M': self.reverse_linefeed, # Move/scroll window down one line RI | |
'E': self.next_line, # Move to next line NEL | |
'F': self.__ignore, # Enter Graphics Mode | |
'G': self.next_line, # Exit Graphics Mode | |
'6': self._dsr_get_cursor_position, # Get cursor position DSR | |
'7': self.save_cursor_position, # Save cursor position and attributes DECSC | |
'8': self.restore_cursor_position, # Restore cursor position and attributes DECSC | |
'H': self._set_tabstop, # Set a tab at the current column HTS | |
'I': self.reverse_linefeed, | |
'(': self.set_G0_charset, # Designate G0 Character Set | |
')': self.set_G1_charset, # Designate G1 Character Set | |
'N': self.__ignore, # Set single shift 2 SS2 | |
'O': self.__ignore, # Set single shift 3 SS3 | |
'5': self._device_status_report, # Request: Device status report DSR | |
'0': self.__ignore, # Response: terminal is OK DSR | |
'P': self._dcs_handler, # Device Control String DCS | |
# NOTE: = and > are ignored because the user can override/control | |
# them via the numlock key on their keyboard. To do otherwise would | |
# just confuse people. | |
'=': self.__ignore, # Application Keypad DECPAM | |
'>': self.__ignore, # Exit alternate keypad mode | |
'<': self.__ignore, # Exit VT-52 mode | |
'Z': self._csi_device_identification, | |
} | |
self.csi_handlers = { | |
'A': self.cursor_up, | |
'B': self.cursor_down, | |
'C': self.cursor_right, | |
'D': self.cursor_left, | |
'E': self.cursor_next_line, # NOTE: Not the same as next_line() | |
'F': self.cursor_previous_line, | |
'G': self.cursor_horizontal_absolute, | |
'H': self.cursor_position, | |
'L': self.insert_line, | |
'M': self.delete_line, | |
#'b': self.repeat_last_char, # TODO | |
'c': self._csi_device_identification, # Device status report (DSR) | |
'g': self.__ignore, # TODO: Tab clear | |
'h': self.set_expanded_mode, | |
'i': self.__ignore, # ESC[5i is "redirect to printer", ESC[4i ends it | |
'l': self.reset_expanded_mode, | |
'f': self.cursor_position, | |
'd': self.cursor_position_vertical, # Vertical Line Position Absolute (VPA) | |
#'e': self.cursor_position_vertical_relative, # VPR TODO | |
'J': self.clear_screen_from_cursor, | |
'K': self.clear_line_from_cursor, | |
'S': self.scroll_up, | |
'T': self.scroll_down, | |
's': self.save_cursor_position, | |
'u': self.restore_cursor_position, | |
'm': self._set_rendition, | |
'n': self._csi_device_status_report, # <ESC>[6n is the only one I know of (request cursor position) | |
'p': self.reset, # TODO: "!p" is "Soft terminal reset". Also, "Set conformance level" (VT100, VT200, or VT300) | |
'r': self._set_top_bottom, # DECSTBM (used by many apps) | |
'q': self.set_led_state, # Seems a bit silly but you never know | |
'P': self.delete_characters, # DCH Deletes the specified number of chars | |
'X': self._erase_characters, # ECH Same as DCH but also deletes renditions | |
'Z': self.insert_characters, # Inserts the specified number of chars | |
'@': self.insert_characters, # Inserts the specified number of chars | |
#'`': self._char_position_row, # Position cursor (row only) | |
#'t': self.window_manipulation, # TODO | |
#'z': self.locator, # TODO: DECELR "Enable locator reporting" | |
} | |
# Used to store what expanded modes are active | |
self.expanded_modes = { | |
# Important defaults | |
'1': False, # Application Cursor Keys | |
'7': False, # Autowrap | |
'25': True, # Show Cursor | |
} | |
self.expanded_mode_handlers = { | |
# Expanded modes take a True/False argument for set/reset | |
'1': partial(self.expanded_mode_toggle, '1'), | |
'2': self.__ignore, # DECANM and set VT100 mode (and lock keyboard) | |
'3': self.__ignore, # 132 Column Mode (DECCOLM) | |
'4': self.__ignore, # Smooth (Slow) Scroll (DECSCLM) | |
'5': self.__ignore, # Reverse video (might support in future) | |
'6': self.__ignore, # Origin Mode (DECOM) | |
# Wraparound Mode (DECAWM): | |
'7': partial(self.expanded_mode_toggle, '7'), | |
'8': self.__ignore, # Auto-repeat Keys (DECARM) | |
# Send Mouse X & Y on button press: | |
'9': partial(self.expanded_mode_toggle, '9'), | |
'12': self.__ignore, # SRM or Start Blinking Cursor (att610) | |
'18': self.__ignore, # Print form feed (DECPFF) | |
'19': self.__ignore, # Set print extent to full screen (DECPEX) | |
'25': partial(self.expanded_mode_toggle, '25'), | |
'38': self.__ignore, # Enter Tektronix Mode (DECTEK) | |
'41': self.__ignore, # more(1) fix (whatever that is) | |
'42': self.__ignore, # Enable Nation Replacement Character sets (DECNRCM) | |
'44': self.__ignore, # Turn On Margin Bell | |
'45': self.__ignore, # Reverse-wraparound Mode | |
'46': self.__ignore, # Start Logging | |
'47': self.toggle_alternate_screen_buffer, # Use Alternate Screen Buffer | |
'66': self.__ignore, # Application keypad (DECNKM) | |
'67': self.__ignore, # Backarrow key sends delete (DECBKM) | |
# Send Mouse X/Y on button press and release: | |
'1000': partial(self.expanded_mode_toggle, '1000'), | |
# Use Hilite Mouse Tracking: | |
'1001': partial(self.expanded_mode_toggle, '1001'), | |
# Use Cell Motion Mouse Tracking: | |
'1002': partial(self.expanded_mode_toggle, '1002'), | |
# Use All Motion Mouse Tracking: | |
'1003': partial(self.expanded_mode_toggle, '1003'), | |
# Send FocusIn/FocusOut events: | |
'1004': partial(self.expanded_mode_toggle, '1004'), | |
# Enable UTF-8 Mouse Mode: | |
'1005': partial(self.expanded_mode_toggle, '1005'), | |
# Enable SGR Mouse Mode: | |
'1006': partial(self.expanded_mode_toggle, '1006'), | |
'1010': self.__ignore, # Scroll to bottom on tty output | |
'1011': self.__ignore, # Scroll to bottom on key press | |
'1035': self.__ignore, # Enable special modifiers for Alt and NumLock keys | |
'1036': self.__ignore, # Send ESC when Meta modifies a key | |
'1037': self.__ignore, # Send DEL from the editing-keypad Delete key | |
'1047': self.__ignore, # Use Alternate Screen Buffer | |
'1048': self.__ignore, # Save cursor as in DECSC | |
# Save cursor as in DECSC and use Alternate Screen Buffer, | |
# clearing it first: | |
'1049': self.toggle_alternate_screen_buffer_cursor, | |
'1051': self.__ignore, # Set Sun function-key mode | |
'1052': self.__ignore, # Set HP function-key mode | |
'1060': self.__ignore, # Set legacy keyboard emulation (X11R6) | |
'1061': self.__ignore, # Set Sun/PC keyboard emulation of VT220 keyboard | |
} | |
self.callbacks = { | |
CALLBACK_SCROLL_UP: {}, | |
CALLBACK_CHANGED: {}, | |
CALLBACK_CURSOR_POS: {}, | |
CALLBACK_DSR: {}, | |
CALLBACK_TITLE: {}, | |
CALLBACK_BELL: {}, | |
CALLBACK_OPT: {}, | |
CALLBACK_MODE: {}, | |
CALLBACK_RESET: {}, | |
CALLBACK_LEDS: {}, | |
CALLBACK_MESSAGE: {}, | |
} | |
self.leds = { | |
1: False, | |
2: False, | |
3: False, | |
4: False | |
} | |
# supported_magic gets assigned via self.add_magic() below | |
self.supported_magic = [] | |
# Dict for magic "numbers" so we can tell when a particular type of | |
# file begins and ends (so we can capture it in binary form and | |
# later dump it out via dump_html()) | |
# The format is 'beginning': 'whole' | |
self.magic = OrderedDict() | |
# magic_map is like magic except it is in the format of: | |
# 'beginning': <filetype class> | |
self.magic_map = {} | |
# Supported magic (defaults) | |
self.add_magic(PDFFile) | |
self.add_magic(PNGFile) | |
self.add_magic(JPEGFile) | |
# NOTE: The order matters! Some file formats are containers that can | |
# hold other file formats. For example, PDFs can contain JPEGs. So if | |
# we match JPEGs before PDFs we might make a match when we really wanted | |
# to match the overall container (the PDF). | |
self.matched_header = None | |
# These are for saving self.screen and self.renditions so we can support | |
# an "alternate buffer" | |
self.alt_screen = None | |
self.alt_renditions = None | |
self.alt_cursorX = 0 | |
self.alt_cursorY = 0 | |
self.saved_cursorX = 0 | |
self.saved_cursorY = 0 | |
self.saved_rendition = [None] | |
self.capture = "" | |
self.captured_files = {} | |
self.file_counter = pua_counter() | |
# This is for creating a new point of reference every time there's a new | |
# unique rendition at a given coordinate | |
self.rend_counter = unicode_counter() | |
# Used for mapping unicode chars to acutal renditions (to save memory): | |
self.renditions_store = { | |
u' ': [], # Nada, nothing, no rendition. Not the same as below | |
self.rend_counter.next(): [0] # Default is actually reset | |
} | |
self.prev_dump = [] # A cache to speed things up | |
self.prev_dump_rend = [] # Ditto | |
self.html_cache = [] # Ditto | |
self.watcher = None # Placeholder for the file watcher thread (if used) | |
def add_magic(self, filetype): | |
""" | |
Adds the given *filetype* to :attr:`self.supported_magic` and generates | |
the necessary bits in :attr:`self.magic` and :attr:`self.magic_map`. | |
*filetype* is expected to be a subclass of :class:`FileType`. | |
""" | |
#logging.debug("add_magic(%s)" % filetype) | |
if filetype in self.supported_magic: | |
return # Nothing to do; it's already there | |
self.supported_magic.append(filetype) | |
# Wand ready... | |
for Type in self.supported_magic: | |
self.magic.update({Type.re_header: Type.re_capture}) | |
# magic_map is just a convenient way of performing magic, er, I | |
# mean referencing filetypes that match the supported magic numbers. | |
for Type in self.supported_magic: | |
self.magic_map.update({Type.re_header: Type}) | |
def remove_magic(self, filetype): | |
""" | |
Removes the given *filetype* from :attr:`self.supported_magic`, | |
:attr:`self.magic`, and :attr:`self.magic_map`. | |
*filetype* may be the specific filetype class or a string that can be | |
either a filetype.name or filetype.mimetype. | |
""" | |
found = None | |
if isinstance(filetype, basestring): | |
for Type in self.supported_magic: | |
if Type.name == filetype: | |
found = Type | |
break | |
elif Type.mimetype == filetype: | |
found = Type | |
break | |
else: | |
for Type in self.supported_magic: | |
if Type == filetype: | |
found = Type | |
break | |
if not found: | |
raise NotFoundError("%s not found in supported magic" % filetype) | |
self.supported_magic.remove(Type) | |
del self.magic[Type.re_header] | |
del self.magic_map[Type.re_header] | |
def update_magic(self, filetype, mimetype): | |
""" | |
Replaces an existing FileType with the given *mimetype* in | |
:attr:`self.supported_magic` with the given *filetype*. Example:: | |
>>> import terminal | |
>>> term = terminal.Terminal() | |
>>> class NewPDF = class(terminal.PDFile) | |
>>> # Open PDFs immediately in a new window | |
>>> NewPDF.html_template = "<script>window.open({link})</script>" | |
>>> NewPDF.html_icon_template = NewPDF.html_template # Ignore icon | |
>>> term.update_magic(NewPDF, mimetype="application/pdf") | |
""" | |
# Find the matching magic filetype | |
for i, Type in enumerate(self.supported_magic): | |
if Type.mimetype == mimetype: | |
break | |
# Replace self.magic and self.magic_map | |
del self.magic[Type.re_header] | |
del self.magic_map[Type.re_header] | |
self.magic.update({filetype.re_header: filetype.re_capture}) | |
self.magic_map.update({filetype.re_header: filetype}) | |
# Finally replace the existing filetype in supported_magic | |
self.supported_magic[i] = filetype | |
def init_screen(self): | |
""" | |
Fills :attr:`screen` with empty lines of (unicode) spaces using | |
:attr:`self.cols` and :attr:`self.rows` for the dimensions. | |
.. note:: Just because each line starts out with a uniform length does not mean it will stay that way. Processing of escape sequences is handled when an output function is called. | |
""" | |
logging.debug('init_screen()') | |
self.screen = [array('u', u' ' * self.cols) for a in xrange(self.rows)] | |
# Tabstops | |
self.tabstops = set(range(7, self.cols, 8)) | |
# Base cursor position | |
self.cursorX = 0 | |
self.cursorY = 0 | |
self.rendition_set = False | |
self.prev_dump = [] # Force a full dump with an init | |
self.prev_dump_rend = [] | |
self.html_cache = [] # Force this to be reset as well | |
def init_renditions(self, rendition=unichr(1000)): # Match unicode_counter | |
""" | |
Replaces :attr:`self.renditions` with arrays of *rendition* (characters) | |
using :attr:`self.cols` and :attr:`self.rows` for the dimenions. | |
""" | |
logging.debug("init_renditions(%s)" % repr(rendition)) | |
# The actual renditions at various coordinates: | |
self.renditions = [ | |
array('u', rendition * self.cols) for a in xrange(self.rows)] | |
def init_scrollback(self): | |
""" | |
Empties the scrollback buffers (:attr:`self.scrollback_buf` and | |
:attr:`self.scrollback_renditions`). | |
""" | |
self.scrollback_buf = [] | |
self.scrollback_renditions = [] | |
def add_callback(self, event, callback, identifier=None): | |
""" | |
Attaches the given *callback* to the given *event*. If given, | |
*identifier* can be used to reference this callback leter (e.g. when you | |
want to remove it). Otherwise an identifier will be generated | |
automatically. If the given *identifier* is already attached to a | |
callback at the given event that callback will be replaced with | |
*callback*. | |
:event: The numeric ID of the event you're attaching *callback* to. The :ref:`callback constants <callback_constants>` should be used as the numerical IDs. | |
:callback: The function you're attaching to the *event*. | |
:identifier: A string or number to be used as a reference point should you wish to remove or update this callback later. | |
Returns the identifier of the callback. to Example:: | |
>>> term = Terminal() | |
>>> def somefunc(): pass | |
>>> id = "myref" | |
>>> ref = term.add_callback(term.CALLBACK_BELL, somefunc, id) | |
.. note:: This allows the controlling program to have multiple callbacks for the same event. | |
""" | |
if not identifier: | |
identifier = callback.__hash__() | |
self.callbacks[event][identifier] = callback | |
return identifier | |
def remove_callback(self, event, identifier): | |
""" | |
Removes the callback referenced by *identifier* that is attached to the | |
given *event*. Example:: | |
>>> term.remove_callback(CALLBACK_BELL, "myref") | |
""" | |
del self.callbacks[event][identifier] | |
def remove_all_callbacks(self, identifier): | |
""" | |
Removes all callbacks associated with *identifier*. | |
""" | |
for event, identifiers in self.callbacks.items(): | |
try: | |
del self.callbacks[event][identifier] | |
except KeyError: | |
pass # No match, no biggie | |
def send_message(self, message): | |
""" | |
A convenience function for calling all CALLBACK_MESSAGE callbacks. | |
""" | |
logging.debug('send_message(%s)' % message) | |
try: | |
for callback in self.callbacks[CALLBACK_MESSAGE].values(): | |
callback(message) | |
except TypeError as e: | |
pass | |
def send_update(self): | |
""" | |
A convenience function for calling all CALLBACK_CHANGED callbacks. | |
""" | |
#logging.debug('send_update()') | |
try: | |
for callback in self.callbacks[CALLBACK_CHANGED].values(): | |
callback() | |
except TypeError as e: | |
pass | |
def send_cursor_update(self): | |
""" | |
A convenience function for calling all CALLBACK_CURSOR_POS callbacks. | |
""" | |
#logging.debug('send_cursor_update()') | |
try: | |
for callback in self.callbacks[CALLBACK_CURSOR_POS].values(): | |
callback() | |
except TypeError as e: | |
pass | |
def reset(self, *args, **kwargs): | |
""" | |
Resets the terminal back to an empty screen with all defaults. Calls | |
:meth:`Terminal.callbacks[CALLBACK_RESET]` when finished. | |
.. note:: If terminal output has been suspended (e.g. via ctrl-s) this will not un-suspend it (you need to issue ctrl-q to the underlying program to do that). | |
""" | |
logging.debug('reset()') | |
self.leds = { | |
1: False, | |
2: False, | |
3: False, | |
4: False | |
} | |
self.expanded_modes = { | |
# Important defaults | |
'1': False, | |
'7': False, | |
'25': True, | |
} | |
self.local_echo = True | |
self.title = "Gate One" | |
self.esc_buffer = '' | |
self.insert_mode = False | |
self.rendition_set = False | |
self.current_charset = 0 | |
self.set_G0_charset('B') | |
self.set_G1_charset('B') | |
self.use_g0_charset() | |
self.top_margin = 0 | |
self.bottom_margin = self.rows - 1 | |
self.alt_screen = None | |
self.alt_renditions = None | |
self.alt_cursorX = 0 | |
self.alt_cursorY = 0 | |
self.saved_cursorX = 0 | |
self.saved_cursorY = 0 | |
self.saved_rendition = [None] | |
self.init_screen() | |
self.init_renditions() | |
self.init_scrollback() | |
self.prev_dump = [] | |
self.prev_dump_rend = [] | |
self.html_cache = [] | |
try: | |
self.callbacks[CALLBACK_RESET]() | |
except TypeError: | |
pass | |
def __ignore(self, *args, **kwargs): | |
""" | |
Does nothing (on purpose!). Used as a placeholder for unimplemented | |
functions. | |
""" | |
pass | |
def resize(self, rows, cols, em_dimensions=None): | |
""" | |
Resizes the terminal window, adding or removing *rows* or *cols* as | |
needed. If *em_dimensions* are provided they will be stored in | |
*self.em_dimensions* (which is currently only used by image output). | |
""" | |
logging.debug("resize(%s, %s)" % (rows, cols)) | |
if em_dimensions: | |
self.em_dimensions = em_dimensions | |
if rows == self.rows and cols == self.cols: | |
return # Nothing to do--don't mess with the margins or the cursor | |
if rows < self.rows: # Remove rows from the top | |
for i in xrange(self.rows - rows): | |
line = self.screen.pop(0) | |
# Add it to the scrollback buffer so it isn't lost forever | |
self.scrollback_buf.append(line) | |
rend = self.renditions.pop(0) | |
self.scrollback_renditions.append(rend) | |
elif rows > self.rows: # Add rows at the bottom | |
for i in xrange(rows - self.rows): | |
line = array('u', u' ' * self.cols) | |
renditions = array('u', unichr(1000) * self.cols) | |
self.screen.append(line) | |
self.renditions.append(renditions) | |
self.rows = rows | |
self.top_margin = 0 | |
self.bottom_margin = self.rows - 1 | |
# Fix the cursor location: | |
if self.cursorY >= self.rows: | |
self.cursorY = self.rows - 1 | |
if cols > self.cols: # Add cols to the right | |
for i in xrange(self.rows): | |
for j in xrange(cols - self.cols): | |
self.screen[i].append(u' ') | |
self.renditions[i].append(unichr(1000)) | |
self.cols = cols | |
# Fix the cursor location: | |
if self.cursorX >= self.cols: | |
self.cursorX = self.cols - 1 | |
self.rendition_set = False | |
def _set_top_bottom(self, settings): | |
""" | |
DECSTBM - Sets :attr:`self.top_margin` and :attr:`self.bottom_margin` | |
using the provided settings in the form of '<top_margin>;<bottom_margin>'. | |
.. note:: This also handles restore/set "DEC Private Mode Values". | |
""" | |
#logging.debug("_set_top_bottom(%s)" % settings) | |
# NOTE: Used by screen and vi so this needs to work and work well! | |
if len(settings): | |
if settings.startswith('?'): | |
# This is a set/restore DEC PMV sequence | |
return # Ignore (until I figure out what this should do) | |
top, bottom = settings.split(';') | |
self.top_margin = max(0, int(top) - 1) # These are 0-based | |
if bottom: | |
self.bottom_margin = min(self.rows - 1, int(bottom) - 1) | |
else: | |
# Reset to defaults (full screen margins) | |
self.top_margin, self.bottom_margin = 0, self.rows - 1 | |
def get_cursor_position(self): | |
""" | |
Returns the current cursor positition as a tuple:: | |
(row, col) | |
""" | |
return (self.cursorY, self.cursorX) | |
def set_title(self, title): | |
""" | |
Sets :attr:`self.title` to *title* and executes | |
:meth:`Terminal.callbacks[CALLBACK_TITLE]` | |
""" | |
self.title = title | |
try: | |
for callback in self.callbacks[CALLBACK_TITLE].values(): | |
callback() | |
except TypeError as e: | |
logging.error(_("Got TypeError on CALLBACK_TITLE...")) | |
logging.error(repr(self.callbacks[CALLBACK_TITLE])) | |
logging.error(e) | |
def get_title(self): | |
"""Returns :attr:`self.title`""" | |
return self.title | |
# TODO: put some logic in these save/restore functions to walk the current | |
# rendition line to come up with a logical rendition for that exact spot. | |
def save_cursor_position(self, mode=None): | |
""" | |
Saves the cursor position and current rendition settings to | |
:attr:`self.saved_cursorX`, :attr:`self.saved_cursorY`, and | |
:attr:`self.saved_rendition` | |
.. note:: Also handles the set/restore "Private Mode Settings" sequence. | |
""" | |
if mode: # Set DEC private mode | |
# TODO: Need some logic here to save the current expanded mode | |
# so we can restore it in _set_top_bottom(). | |
self.set_expanded_mode(mode) | |
# NOTE: args and kwargs are here to make sure we don't get an exception | |
# when we're called via escape sequences. | |
self.saved_cursorX = self.cursorX | |
self.saved_cursorY = self.cursorY | |
self.saved_rendition = self.cur_rendition | |
def restore_cursor_position(self, *args, **kwargs): | |
""" | |
Restores the cursor position and rendition settings from | |
:attr:`self.saved_cursorX`, :attr:`self.saved_cursorY`, and | |
:attr:`self.saved_rendition` (if they're set). | |
""" | |
if self.saved_cursorX and self.saved_cursorY: | |
self.cursorX = self.saved_cursorX | |
self.cursorY = self.saved_cursorY | |
self.cur_rendition = self.saved_rendition | |
def _dsr_get_cursor_position(self): | |
""" | |
Returns the current cursor positition as a DSR response in the form of:: | |
'\x1b<self.cursorY>;<self.cursorX>R' | |
Also executes CALLBACK_DSR with the same output as the first argument. | |
Example:: | |
self.callbacks[CALLBACK_DSR]('\x1b20;123R') | |
""" | |
esc_cursor_pos = '\x1b%s;%sR' % (self.cursorY, self.cursorX) | |
try: | |
for callback in self.callbacks[CALLBACK_DSR].values(): | |
callback(esc_cursor_pos) | |
except TypeError: | |
pass | |
return esc_cursor_pos | |
def _dcs_handler(self, string=None): | |
""" | |
Handles Device Control String sequences. Unimplemented. Probablye not | |
appropriate for Gate One. If you believe this to be false please open | |
a ticket in the issue tracker. | |
""" | |
pass | |
#print("TODO: Handle this DCS: %s" % string) | |
def _set_line_params(self, param): | |
""" | |
This function handles the control sequences that set double and single | |
line heights and widths. It also handles the "screen alignment test" ( | |
fill the screen with Es). | |
.. note:: Double-line height text is currently unimplemented (does anything actually use it?). | |
""" | |
try: | |
param = int(param) | |
except ValueError: | |
logging.warning("Couldn't handle escape sequence #%s" % repr(param)) | |
if param == 8: | |
# Screen alignment test | |
self.init_renditions() | |
self.screen = [ | |
array('u', u'E' * self.cols) for a in xrange(self.rows)] | |
# TODO: Get this handling double line height stuff... For kicks | |
def set_G0_charset(self, char): | |
""" | |
Sets the terminal's G0 (default) charset to the type specified by | |
*char*. | |
Here's the possibilities:: | |
0 DEC Special Character and Line Drawing Set | |
A United Kingdom (UK) | |
B United States (USASCII) | |
4 Dutch | |
C Finnish | |
5 Finnish | |
R French | |
Q French Canadian | |
K German | |
Y Italian | |
E Norwegian/Danish | |
6 Norwegian/Danish | |
Z Spanish | |
H Swedish | |
7 Swedish | |
= Swiss | |
""" | |
#logging.debug("Setting G0 charset to %s" % repr(char)) | |
try: | |
self.G0_charset = self.charsets[char] | |
except KeyError: | |
self.G0_charset = self.charsets['B'] | |
if self.current_charset == 0: | |
self.charset = self.G0_charset | |
def set_G1_charset(self, char): | |
""" | |
Sets the terminal's G1 (alt) charset to the type specified by *char*. | |
Here's the possibilities:: | |
0 DEC Special Character and Line Drawing Set | |
A United Kingdom (UK) | |
B United States (USASCII) | |
4 Dutch | |
C Finnish | |
5 Finnish | |
R French | |
Q French Canadian | |
K German | |
Y Italian | |
E Norwegian/Danish | |
6 Norwegian/Danish | |
Z Spanish | |
H Swedish | |
7 Swedish | |
= Swiss | |
""" | |
#logging.debug("Setting G1 charset to %s" % repr(char)) | |
try: | |
self.G1_charset = self.charsets[char] | |
except KeyError: | |
self.G1_charset = self.charsets['B'] | |
if self.current_charset == 1: | |
self.charset = self.G1_charset | |
def use_g0_charset(self): | |
""" | |
Sets the current charset to G0. This should get called when ASCII_SO | |
is encountered. | |
""" | |
#logging.debug( | |
#"Switching to G0 charset (which is %s)" % repr(self.G0_charset)) | |
self.current_charset = 0 | |
self.charset = self.G0_charset | |
def use_g1_charset(self): | |
""" | |
Sets the current charset to G1. This should get called when ASCII_SI | |
is encountered. | |
""" | |
#logging.debug( | |
#"Switching to G1 charset (which is %s)" % repr(self.G1_charset)) | |
self.current_charset = 1 | |
self.charset = self.G1_charset | |
def abort_capture(self): | |
""" | |
A convenience function that takes care of canceling a file capture and | |
cleaning up the output. | |
""" | |
logging.debug('abort_capture()') | |
self.cancel_capture = True | |
self.write('\x00') # This won't actually get written | |
self.send_update() | |
self.send_message(_(u'File capture aborted.')) | |
def write(self, chars, special_checks=True): | |
""" | |
Write *chars* to the terminal at the current cursor position advancing | |
the cursor as it does so. If *chars* is not unicode, it will be | |
converted to unicode before being stored in self.screen. | |
if *special_checks* is True (default), Gate One will perform checks for | |
special things like image files coming in via *chars*. | |
""" | |
# NOTE: This is the slowest function in all of Gate One. All | |
# suggestions on how to speed it up are welcome! | |
# Speedups (don't want dots in loops if they can be avoided) | |
specials = self.specials | |
esc_handlers = self.esc_handlers | |
csi_handlers = self.csi_handlers | |
RE_ESC_SEQ = self.RE_ESC_SEQ | |
RE_CSI_ESC_SEQ = self.RE_CSI_ESC_SEQ | |
magic = self.magic | |
magic_map = self.magic_map | |
changed = False | |
# This is commented because of how noisy it is. Uncomment to debug the | |
# terminal emualtor: | |
#logging.debug('handling chars: %s' % repr(chars)) | |
if special_checks: | |
before_chars = "" | |
after_chars = "" | |
if not self.capture: | |
for magic_header in magic: | |
try: | |
if magic_header.match(str(chars)): | |
self.matched_header = magic_header | |
self.capture_regex = magic[magic_header] | |
self.timeout_capture = datetime.now() | |
self.progress_timer = datetime.now() | |
break | |
except UnicodeEncodeError: | |
# Gibberish; drop it and pretend it never happened | |
logging.debug(_( | |
"Got UnicodeEncodeError trying to check FileTypes")) | |
self.esc_buffer = "" | |
# Make it so it won't barf below | |
chars = chars.encode(self.encoding, 'ignore') | |
if self.capture or self.matched_header: | |
self.capture += chars | |
if self.cancel_capture: | |
# Try to split the garbage from the post-ctrl-c output | |
split_capture = self.RE_SIGINT.split(self.capture) | |
after_chars = split_capture[-1] | |
self.capture = '' | |
self.matched_header = None | |
self.cancel_capture = False | |
self.write(u'^C\r\n', special_checks=False) | |
self.write(after_chars, special_checks=False) | |
return | |
now = datetime.now() | |
if now - self.progress_timer > self.message_interval: | |
# Send an update of the progress to the user | |
# NOTE: This message will only get sent if it takes longer | |
# than self.message_interval to capture a file. So it is | |
# nice and user friendly: Small things output instantly | |
# without notifications while larger files that take longer | |
# to capture will keep the user abreast of the progress. | |
ft = magic_map[self.matched_header].name | |
indicator = 'K' | |
size = float(len(self.capture))/1024 # Kb | |
if size > 1024: # Switch to Mb | |
size = size/1024 | |
indicator = 'M' | |
message = _( | |
"%s: %.2f%s captured..." % (ft, size, indicator)) | |
self.notified = True | |
self.send_message(message) | |
self.progress_timer = datetime.now() | |
match = self.capture_regex.search(self.capture) | |
if match: | |
logging.debug( | |
"Matched %s format (%s, %s). Capturing..." % ( | |
self.magic_map[self.matched_header].name, | |
self.cursorY, self.cursorX)) | |
split_capture = self.capture_regex.split(self.capture, 1) | |
before_chars = split_capture[0] | |
capture_length = len(split_capture[1]) | |
self.capture = split_capture[1] | |
after_chars = "".join(split_capture[2:]) | |
if after_chars: | |
if len(after_chars) > 500: | |
# Could be more to this file. Let's wait until output | |
# slows down before attempting to perform a match | |
return | |
else: | |
# These needs to be written before the capture so that | |
# the FileType.capture() method can position things | |
# appropriately. | |
if before_chars: | |
# Empty out self.capture temporarily so these chars | |
# get handled properly | |
cap_temp = self.capture | |
self.capture = "" | |
self.write(before_chars, special_checks=False) | |
# Put it back for the rest of the processing | |
self.capture = cap_temp | |
# Perform the capture and start anew | |
self._capture_file() | |
if self.notified: | |
# Send a final notice of how big the file was (just | |
# to keep things consistent). | |
ft = magic_map[self.matched_header].name | |
indicator = 'K' | |
size = float(len(self.capture))/1024 # Kb | |
if size > 1024: # Switch to Mb | |
size = size/1024 | |
indicator = 'M' | |
message = _( | |
"%s: Capture complete (%.2f%s)" % ( | |
ft, size, indicator)) | |
self.notified = False | |
self.send_message(message) | |
self.capture = "" # Empty it now that is is captured | |
self.matched_header = None # Ditto | |
self.write(after_chars, special_checks=True) | |
return | |
return | |
# Have to convert to unicode | |
try: | |
chars = chars.decode(self.encoding, "handle_special") | |
except UnicodeDecodeError: | |
# Just in case | |
try: | |
chars = chars.decode(self.encoding, "ignore") | |
except UnicodeDecodeError: | |
logging.error( | |
_("Double UnicodeDecodeError in terminal.Terminal.")) | |
return | |
except AttributeError: | |
# In Python 3 strings don't have .decode() | |
pass # Already Unicode | |
backspaced = False | |
for char in chars: | |
charnum = ord(char) | |
if charnum in specials: | |
specials[charnum]() | |
else: | |
# Now handle the regular characters and escape sequences | |
if self.esc_buffer: # We've got an escape sequence going on... | |
try: | |
self.esc_buffer += char | |
# First try to handle non-CSI ESC sequences (the basics) | |
match_obj = RE_ESC_SEQ.match(self.esc_buffer) | |
if match_obj: | |
seq_type = match_obj.group(1) # '\x1bA' -> 'A' | |
# Call the matching ESC handler | |
#logging.debug('ESC seq: %s' % seq_type) | |
if len(seq_type) == 1: # Single-character sequnces | |
esc_handlers[seq_type]() | |
else: # Multi-character stuff like '\x1b)B' | |
esc_handlers[seq_type[0]](seq_type[1:]) | |
self.esc_buffer = '' # All done with this one | |
continue | |
# Next try to handle CSI ESC sequences | |
match_obj = RE_CSI_ESC_SEQ.match(self.esc_buffer) | |
if match_obj: | |
csi_values = match_obj.group(1) # e.g. '0;1;37' | |
csi_type = match_obj.group(2) # e.g. 'm' | |
#logging.debug( | |
#'CSI: %s, %s' % (csi_type, csi_values)) | |
# Call the matching CSI handler | |
try: | |
csi_handlers[csi_type](csi_values) | |
except ValueError: | |
# Commented this out because it can be super noisy | |
#logging.error(_( | |
#"CSI Handler Error: Type: %s, Values: %s" % | |
#(csi_type, csi_values) | |
#)) | |
pass | |
self.esc_buffer = '' | |
continue | |
except KeyError: | |
# No handler for this, try some alternatives | |
if self.esc_buffer.endswith('\x1b\\'): | |
self._osc_handler() | |
else: | |
logging.warning(_( | |
"Warning: No ESC sequence handler for %s" | |
% `self.esc_buffer` | |
)) | |
self.esc_buffer = '' | |
continue # We're done here | |
changed = True | |
if self.cursorX >= self.cols: | |
# Non-autowrap has been disabled due to issues with browser | |
# wrapping. | |
#if self.expanded_modes['7']: | |
self.cursorX = 0 | |
self.newline() | |
#else: | |
#self.screen[self.cursorY].append(u' ') # Make room | |
#self.renditions[self.cursorY].append(u' ') | |
try: | |
self.renditions[self.cursorY][ | |
self.cursorX] = self.cur_rendition | |
if self.insert_mode: | |
# Insert mode dictates that we move everything to the | |
# right for every character we insert. Normally the | |
# program itself will take care of this but older | |
# programs and shells will simply set call ESC[4h, | |
# insert the character, then call ESC[4i to return the | |
# terminal to its regular state. | |
self.insert_characters(1) | |
if charnum in self.charset: | |
char = self.charset[charnum] | |
self.screen[self.cursorY][self.cursorX] = char | |
else: | |
# Double check this isn't a unicode diacritic (accent) | |
# which simply modifies the character before it | |
if unicodedata.combining(char): | |
# This is a diacritic. Combine it with existing: | |
current = self.screen[self.cursorY][self.cursorX] | |
combined = unicodedata.normalize( | |
'NFC', u'%s%s' % (current, char)) | |
# Sometimes a joined combining char can still result | |
# a string of length > 1. So we need to handle that | |
if len(combined) > 1: | |
for i, c in enumerate(combined): | |
self.screen[self.cursorY][ | |
self.cursorX] = c | |
if i < len(combined) - 1: | |
self.cursorX += 1 | |
else: | |
self.screen[self.cursorY][ | |
self.cursorX] = combined | |
else: | |
# Normal character | |
self.screen[self.cursorY][self.cursorX] = char | |
except IndexError as e: | |
# This can happen when escape sequences go haywire | |
logging.error(_( | |
"IndexError in write(): %s" % e)) | |
import traceback, sys | |
traceback.print_exc(file=sys.stdout) | |
self.cursorX += 1 | |
if changed: | |
self.modified = True | |
# Execute our callbacks | |
self.send_update() | |
self.send_cursor_update() | |
def flush(self): | |
""" | |
Only here to make Terminal compatible with programs that want to use | |
file-like methods. | |
""" | |
pass | |
def scroll_up(self, n=1): | |
""" | |
Scrolls up the terminal screen by *n* lines (default: 1). The callbacks | |
CALLBACK_CHANGED and CALLBACK_SCROLL_UP are called after scrolling the | |
screen. | |
.. note:: This will only scroll up the region within self.top_margin and self.bottom_margin (if set). | |
""" | |
#logging.debug("scroll_up(%s)" % n) | |
empty_line = array('u', u' ' * self.cols) # Line full of spaces | |
empty_rend = array('u', unichr(1000) * self.cols) | |
for x in xrange(int(n)): | |
line = self.screen.pop(self.top_margin) # Remove the top line | |
self.scrollback_buf.append(line) # Add it to the scrollback buffer | |
if len(self.scrollback_buf) > 1000: | |
# 1000 lines ought to be enough for anybody | |
self.init_scrollback() | |
# NOTE: This would only be if 1000 lines piled up before the | |
# next dump_html() or dump(). | |
# Add it to the bottom of the window: | |
self.screen.insert(self.bottom_margin, empty_line[:]) # A copy | |
# Remove top line's rendition information | |
rend = self.renditions.pop(self.top_margin) | |
self.scrollback_renditions.append(rend) | |
# Insert a new empty rendition as well: | |
self.renditions.insert(self.bottom_margin, empty_rend[:]) | |
# Execute our callback indicating lines have been updated | |
try: | |
for callback in self.callbacks[CALLBACK_CHANGED].values(): | |
callback() | |
except TypeError: | |
pass | |
# Execute our callback to scroll up the screen | |
try: | |
for callback in self.callbacks[CALLBACK_SCROLL_UP].values(): | |
callback() | |
except TypeError: | |
pass | |
def scroll_down(self, n=1): | |
""" | |
Scrolls down the terminal screen by *n* lines (default: 1). The | |
callbacks CALLBACK_CHANGED and CALLBACK_SCROLL_DOWN are called after | |
scrolling the screen. | |
""" | |
#logging.debug("scroll_down(%s)" % n) | |
for x in xrange(int(n)): | |
self.screen.pop(self.bottom_margin) # Remove the bottom line | |
empty_line = array('u', u' ' * self.cols) # Line full of spaces | |
self.screen.insert(self.top_margin, empty_line) # Add it to the top | |
# Remove bottom line's style information: | |
self.renditions.pop(self.bottom_margin) | |
# Insert a new empty one: | |
empty_line = array('u', unichr(1000) * self.cols) | |
self.renditions.insert(self.top_margin, empty_line) | |
# Execute our callback indicating lines have been updated | |
try: | |
for callback in self.callbacks[CALLBACK_CHANGED].values(): | |
callback() | |
except TypeError: | |
pass | |
# Execute our callback to scroll up the screen | |
try: | |
for callback in self.callbacks[CALLBACK_SCROLL_UP].values(): | |
callback() | |
except TypeError: | |
pass | |
def insert_line(self, n=1): | |
""" | |
Inserts *n* lines at the current cursor position. | |
""" | |
#logging.debug("insert_line(%s)" % n) | |
if not n: # Takes care of an empty string | |
n = 1 | |
n = int(n) | |
for i in xrange(n): | |
self.screen.pop(self.bottom_margin) # Remove the bottom line | |
# Remove bottom line's style information as well: | |
self.renditions.pop(self.bottom_margin) | |
empty_line = array('u', u' ' * self.cols) # Line full of spaces | |
self.screen.insert(self.cursorY, empty_line) # Insert at cursor | |
# Insert a new empty rendition as well: | |
empty_rend = array('u', unichr(1000) * self.cols) | |
self.renditions.insert(self.cursorY, empty_rend) # Insert at cursor | |
def delete_line(self, n=1): | |
""" | |
Deletes *n* lines at the current cursor position. | |
""" | |
#logging.debug("delete_line(%s)" % n) | |
if not n: # Takes care of an empty string | |
n = 1 | |
n = int(n) | |
for i in xrange(n): | |
self.screen.pop(self.cursorY) # Remove the line at the cursor | |
# Remove the line's style information as well: | |
self.renditions.pop(self.cursorY) | |
# Now add an empty line and empty set of renditions to the bottom of | |
# the view | |
empty_line = array('u', u' ' * self.cols) # Line full of spaces | |
# Add it to the bottom of the view: | |
self.screen.insert(self.bottom_margin, empty_line) # Insert at bottom | |
# Insert a new empty rendition as well: | |
empty_rend = array('u', unichr(1000) * self.cols) | |
self.renditions.insert(self.bottom_margin, empty_rend) | |
def backspace(self): | |
"""Execute a backspace (\\x08)""" | |
self.cursor_left(1) | |
def horizontal_tab(self): | |
"""Execute horizontal tab (\\x09)""" | |
for stop in sorted(self.tabstops): | |
if self.cursorX < stop: | |
self.cursorX = stop + 1 | |
break | |
else: | |
self.cursorX = self.columns - 1 | |
def _set_tabstop(self): | |
"""Sets a tabstop at the current position of :attr:`self.cursorX`.""" | |
if self.cursorX not in self.tabstops: | |
for tabstop in self.tabstops: | |
if self.cursorX > tabstop: | |
self.tabstops.add(self.cursorX) | |
self.tabstops.sort() # Put them in order :) | |
break | |
def linefeed(self): | |
""" | |
LF - Executes a line feed. | |
.. note:: This actually just calls :meth:`Terminal.newline`. | |
""" | |
self.newline() | |
def next_line(self): | |
""" | |
CNL - Moves the cursor down one line to the home position. Will not | |
result in a scrolling event like newline() does. | |
.. note:: This is not the same thing as :meth:`Terminal.cursor_next_line` which preserves the cursor's column position. | |
""" | |
self.cursorX = self.cursor_home | |
if self.cursorY < self.rows -1: | |
self.cursorY += 1 | |
def reverse_linefeed(self): | |
""" | |
RI - Executes a reverse line feed: Move the cursor up one line to the | |
home position. If the cursor move would result in going past the top | |
margin of the screen (upwards) this will execute a scroll_down() event. | |
""" | |
self.cursorX = 0 | |
self.cursorY -= 1 | |
if self.cursorY < self.top_margin: | |
self.scroll_down() | |
self.cursorY = self.top_margin | |
def newline(self): | |
""" | |
Increases :attr:`self.cursorY` by 1 and calls :meth:`Terminal.scroll_up` | |
if that action will move the curor past :attr:`self.bottom_margin` | |
(usually the bottom of the screen). | |
""" | |
cols = self.cols | |
self.cursorY += 1 | |
if self.cursorY > self.bottom_margin: | |
self.scroll_up() | |
self.cursorY = self.bottom_margin | |
self.clear_line() | |
# Shorten the line if it is longer than the number of columns | |
# NOTE: This lets us keep the width of existing lines even if the number | |
# of columns is reduced while at the same time accounting for apps like | |
# 'top' that merely overwrite existing lines. If we didn't do this | |
# the output from 'top' would get all messed up from leftovers at the | |
# tail end of every line when self.cols had a larger value. | |
if len(self.screen[self.cursorY]) >= cols: | |
self.screen[self.cursorY] = self.screen[self.cursorY][:cols] | |
self.renditions[self.cursorY] = self.renditions[self.cursorY][:cols] | |
# NOTE: The above logic is placed inside of this function instead of | |
# inside self.write() in order to reduce CPU utilization. There's no | |
# point in performing a conditional check for every incoming character | |
# when the only time it will matter is when a newline is being written. | |
def carriage_return(self): | |
""" | |
Executes a carriage return (sets :attr:`self.cursorX` to 0). In other | |
words it moves the cursor back to position 0 on the line. | |
""" | |
# This autowrap magic stuff doesn't work so well so autowrap is now | |
# explicit until I figure out a way to differentiate between the type of | |
# long line output by something like 'ps axwww' and the type that gets | |
# output by, say, bash when you type a really long command. | |
#if self.cursorX >= self.cols and self.cursorX != self.cols: | |
#self.newline() | |
if not self.capture: | |
self.cursorX = 0 | |
def _xon(self): | |
""" | |
Handles the XON character (stop ignoring). | |
.. note:: Doesn't actually do anything (this feature was probably meant for the underlying terminal program). | |
""" | |
logging.debug('_xon()') | |
self.local_echo = True | |
def _xoff(self): | |
""" | |
Handles the XOFF character (start ignoring) | |
.. note:: Doesn't actually do anything (this feature was probably meant for the underlying terminal program). | |
""" | |
logging.debug('_xoff()') | |
self.local_echo = False | |
def _cancel_esc_sequence(self): | |
""" | |
Cancels any escape sequence currently being processed. In other words | |
it empties :attr:`self.esc_buffer`. | |
""" | |
self.esc_buffer = '' | |
def _sub_esc_sequence(self): | |
""" | |
Cancels any escape sequence currently in progress and replaces | |
:attr:`self.esc_buffer` with single question mark (?). | |
.. note:: Nothing presently uses this function and I can't remember what it was supposed to be part of (LOL!). Obviously it isn't very important. | |
""" | |
self.esc_buffer = '' | |
self.write('?') | |
def _escape(self): | |
""" | |
Handles the escape character as well as escape sequences that may end | |
with an escape character. | |
""" | |
buf = self.esc_buffer | |
if buf.startswith('\x1bP') or buf.startswith('\x1b]'): | |
# CSRs and OSCs are special | |
self.esc_buffer += '\x1b' | |
else: | |
# Get rid of whatever's there since we obviously didn't know what to | |
# do with it | |
self.esc_buffer = '\x1b' | |
def _csi(self): | |
""" | |
Marks the start of a CSI escape sequence (which is itself a character) | |
by setting :attr:`self.esc_buffer` to '\\\\x1b[' (which is the CSI | |
escape sequence). | |
""" | |
self.esc_buffer = '\x1b[' | |
def _capture_file(self): | |
""" | |
This function gets called by :meth:`Terminal.write` when the incoming | |
character stream matches a value in :attr:`self.magic`. It will call | |
whatever function is associated with the matching regex in | |
:attr:`self.magic_map`. | |
""" | |
logging.debug("_capture_file()") | |
for magic_header in self.magic.keys(): | |
if magic_header.match(self.capture): | |
# Create a reference point we can use to retrieve the file later | |
ref = self.file_counter.next() | |
# Before doing anything else we need to mark the current cursor | |
# location as belonging to our file | |
self.screen[self.cursorY][self.cursorX] = ref | |
# Create an instance of the filetype we can reference | |
filetype_instance = self.magic_map[magic_header]( | |
path=self.temppath, | |
linkpath=self.linkpath, | |
icondir=self.icondir) | |
self.captured_files[ref] = filetype_instance | |
filetype_instance.capture(self.capture, self) | |
# Start up an open file watcher so leftover file objects get | |
# closed when they're no longer being used | |
if not self.watcher or not self.watcher.isAlive(): | |
import threading | |
self.watcher = threading.Thread( | |
name='watcher', target=self._captured_fd_watcher) | |
self.watcher.setDaemon(True) | |
self.watcher.start() | |
return | |
def _captured_fd_watcher(self): | |
""" | |
Meant to be run inside of a thread, calls close_captured_fds() until there | |
are no more open image file descriptors. | |
""" | |
logging.debug("starting _captured_fd_watcher()") | |
import time | |
quitting = False | |
while not quitting: | |
if self.captured_files: | |
self.close_captured_fds() | |
time.sleep(5) | |
else: | |
quitting = True | |
logging.debug('_captured_fd_watcher() quitting: No more images.') | |
def close_captured_fds(self): | |
""" | |
Closes the file descriptors of any captured files that are no longer on | |
the screen. | |
""" | |
#logging.debug('close_captured_fds()') # Commented because it's kinda noisy | |
if self.captured_files: | |
for ref in self.captured_files.keys(): | |
found = False | |
for line in self.screen: | |
if ref in line: | |
found = True | |
break | |
if self.alt_screen: | |
for line in self.alt_screen: | |
if ref in line: | |
found = True | |
break | |
if not found: | |
#self.captured_files[ref].close() | |
del self.captured_files[ref] | |
def _string_terminator(self): | |
""" | |
Handle the string terminator (ST). | |
.. note:: Doesn't actually do anything at the moment. Probably not needed since :meth:`Terminal._escape` and/or :meth:`Terminal.bell` will end up handling any sort of sequence that would end in an ST anyway. | |
""" | |
# NOTE: Might this just call _cancel_esc_sequence? I need to double-check. | |
pass | |
def _osc_handler(self): | |
""" | |
Handles Operating System Command (OSC) escape sequences which need | |
special care since they are of indeterminiate length and end with | |
either a bell (\\\\x07) or a sequence terminator (\\\\x9c aka ST). This | |
will usually be called from :meth:`Terminal.bell` to set the title of | |
the terminal (just like an xterm) but it is also possible to be called | |
directly whenever an ST is encountered. | |
""" | |
# Try the title sequence first | |
match_obj = self.RE_TITLE_SEQ.match(self.esc_buffer) | |
if match_obj: | |
self.esc_buffer = '' | |
title = match_obj.group(1) | |
self.set_title(title) # Sets self.title | |
return | |
# Next try our special optional handler sequence | |
match_obj = self.RE_OPT_SEQ.match(self.esc_buffer) | |
if match_obj: | |
self.esc_buffer = '' | |
text = match_obj.group(1) | |
self._opt_handler(text) | |
return | |
# At this point we've encountered something unusual | |
logging.warning(_("Warning: No special ESC sequence handler for %s" % | |
`self.esc_buffer`)) | |
self.esc_buffer = '' | |
def bell(self): | |
""" | |
Handles the bell character and executes | |
:meth:`Terminal.callbacks[CALLBACK_BELL]` (if we are not in the middle | |
of an escape sequence that ends with a bell character =). If we *are* | |
in the middle of an escape sequence, calls :meth:`self._osc_handler` | |
since we can be nearly certain that we're simply terminating an OSC | |
sequence. Isn't terminal emulation grand? ⨀_⨀ | |
""" | |
# NOTE: A little explanation is in order: The bell character (\x07) by | |
# itself should play a bell (pretty straighforward). However, if | |
# the bell character is at the tail end of a particular escape | |
# sequence (string starting with \x1b]0;) this indicates an xterm | |
# title (everything between \x1b]0;...\x07). | |
if not self.esc_buffer: # We're not in the middle of an esc sequence | |
logging.debug('Regular bell') | |
try: | |
for callback in self.callbacks[CALLBACK_BELL].values(): | |
callback() | |
except TypeError: | |
pass | |
else: # We're (likely) setting a title | |
self.esc_buffer += '\x07' # Add the bell char so we don't lose it | |
self._osc_handler() | |
def _device_status_report(self, n=None): | |
""" | |
Returns '\\\\x1b[0n' (terminal OK) and executes: | |
.. code-block:: python | |
self.callbacks[CALLBACK_DSR]("\\x1b[0n") | |
""" | |
logging.debug("_device_status_report()") | |
response = u"\x1b[0n" | |
try: | |
for callback in self.callbacks[CALLBACK_DSR].values(): | |
callback(response) | |
except TypeError: | |
pass | |
return response | |
def _csi_device_identification(self, request=None): | |
""" | |
If we're responding to ^[Z, ^[c, or ^[0c, returns '\\\\x1b[1;2c' | |
(Meaning: I'm a vt220 terminal, version 1.0) and | |
executes: | |
.. code-block:: python | |
self.callbacks[self.CALLBACK_DSR]("\\x1b[1;2c") | |
If we're responding to ^[>c or ^[>0c, executes: | |
.. code-block:: python | |
self.callbacks[self.CALLBACK_DSR]("\\x1b[>0;271;0c") | |
""" | |
logging.debug("_csi_device_identification(%s)" % request) | |
if request and u">" in request: | |
response = u"\x1b[>0;271;0c" | |
else: | |
response = u"\x1b[?1;2c" | |
try: | |
for callback in self.callbacks[CALLBACK_DSR].values(): | |
callback(response) | |
except TypeError: | |
pass | |
return response | |
def _csi_device_status_report(self, request=None): | |
""" | |
Calls :meth:`self.callbacks[self.CALLBACK_DSR]` with an appropriate | |
response to the given *request*. | |
.. code-block:: python | |
self.callbacks[self.CALLBACK_DSR](response) | |
Supported requests and their responses: | |
============================= ================== | |
Request Response | |
============================= ================== | |
^[5n (Status Report) ^[[0n | |
^[6n (Report Cursor Position) ^[[<row>;<column>R | |
^[15n (Printer Ready?) ^[[10n (Ready) | |
============================= ================== | |
""" | |
logging.debug("_csi_device_status_report(%s)" % request) | |
supported_requests = [ | |
u"5", | |
u"6", | |
u"15", | |
] | |
if not request: | |
return # Nothing to do | |
response = u"" | |
if request.startswith('?'): | |
# Get rid of it | |
request = request[1:] | |
if request in supported_requests: | |
if request == u"5": | |
response = u"\x1b[0n" | |
elif request == u"6": | |
rows = self.cursorY + 1 | |
cols = self.cursorX + 1 | |
response = u"\x1b[%s;%sR" % (rows, cols) | |
elif request == u"15": | |
response = u"\x1b[10n" | |
try: | |
for callback in self.callbacks[CALLBACK_DSR].values(): | |
callback(response) | |
except TypeError: | |
pass | |
return response | |
def set_expanded_mode(self, setting): | |
""" | |
Accepts "standard mode" settings. Typically '\\\\x1b[?25h' to hide cursor. | |
Notes on modes:: | |
'?1h' - Application Cursor Keys | |
'?5h' - DECSCNM (default off): Set reverse-video mode | |
'?7h' - DECAWM: Autowrap mode | |
'?12h' - Local echo (SRM or Send Receive Mode) | |
'?25h' - Hide cursor | |
'?1000h' - Send Mouse X/Y on button press and release | |
'?1001h' - Use Hilite Mouse Tracking | |
'?1002h' - Use Cell Motion Mouse Tracking | |
'?1003h' - Use All Motion Mouse Tracking | |
'?1004h' - Send focus in/focus out events | |
'?1005h' - Enable UTF-8 Mouse Mode | |
'?1006h' - Enable SGR Mouse Mode | |
'?1015h' - Enable urxvt Mouse Mode | |
'?1049h' - Save cursor and screen | |
""" | |
# TODO: Add support for the following: | |
# * 3: 132 column mode (might be "or greater") | |
# * 4: Smooth scroll (for animations and also makes things less choppy) | |
# * 5: Reverse video (should be easy: just need some extra CSS) | |
# * 6: Origin mode | |
# * 7: Wraparound mode | |
logging.debug("set_expanded_mode(%s)" % setting) | |
if setting.startswith('?'): | |
# DEC Private Mode Set | |
setting = setting[1:] # Don't need the ? | |
settings = setting.split(';') | |
for setting in settings: | |
try: | |
self.expanded_mode_handlers[setting](True) | |
except (KeyError, TypeError): | |
pass # Unsupported expanded mode | |
try: | |
for callback in self.callbacks[CALLBACK_MODE].values(): | |
callback(setting, True) | |
except TypeError: | |
pass | |
else: | |
# There's a couple mode settings that are just "[Nh" where N==number | |
# [2h Keyboard Action Mode (AM) | |
# [4h Insert Mode | |
# [12h Send/Receive Mode (SRM) | |
# [24h Automatic Newline (LNM) | |
if setting == '4': | |
self.insert_mode = True | |
def reset_expanded_mode(self, setting): | |
""" | |
Accepts "standard mode" settings. Typically '\\\\x1b[?25l' to show | |
cursor. | |
""" | |
logging.debug("reset_expanded_mode(%s)" % setting) | |
if setting.startswith('?'): | |
setting = setting[1:] # Don't need the ? | |
settings = setting.split(';') | |
for setting in settings: | |
try: | |
self.expanded_mode_handlers[setting](False) | |
except (KeyError, TypeError): | |
pass # Unsupported expanded mode | |
try: | |
for callback in self.callbacks[CALLBACK_MODE].values(): | |
callback(setting, False) | |
except TypeError: | |
pass | |
else: | |
# There's a couple mode settings that are just "[Nh" where N==number | |
# [2h Keyboard Action Mode (AM) | |
# [4h Insert Mode | |
# [12h Send/Receive Mode (SRM) | |
# [24h Automatic Newline (LNM) | |
# The only one we care about is 4 (insert mode) | |
if setting == '4': | |
self.insert_mode = False | |
def toggle_alternate_screen_buffer(self, alt): | |
""" | |
If *alt* is True, copy the current screen and renditions to | |
:attr:`self.alt_screen` and :attr:`self.alt_renditions` then re-init | |
:attr:`self.screen` and :attr:`self.renditions`. | |
If *alt* is False, restore the saved screen buffer and renditions then | |
nullify :attr:`self.alt_screen` and :attr:`self.alt_renditions`. | |
""" | |
#logging.debug('toggle_alternate_screen_buffer(%s)' % alt) | |
if alt: | |
# Save the existing screen and renditions | |
self.alt_screen = self.screen[:] | |
self.alt_renditions = self.renditions[:] | |
# Make a fresh one | |
self.clear_screen() | |
else: | |
# Restore the screen | |
if self.alt_screen and self.alt_renditions: | |
self.screen = self.alt_screen[:] | |
self.renditions = self.alt_renditions[:] | |
# Empty out the alternate buffer (to save memory) | |
self.alt_screen = None | |
self.alt_renditions = None | |
# These all need to be reset no matter what | |
self.cur_rendition = unichr(1000) | |
self.prev_dump = [] | |
self.prev_dump_rend = [] | |
self.html_cache = [] | |
def toggle_alternate_screen_buffer_cursor(self, alt): | |
""" | |
Same as :meth:`Terminal.toggle_alternate_screen_buffer` but also | |
saves/restores the cursor location. | |
""" | |
#logging.debug('toggle_alternate_screen_buffer_cursor(%s)' % alt) | |
if alt: | |
self.alt_cursorX = self.cursorX | |
self.alt_cursorY = self.cursorY | |
else: | |
self.cursorX = self.alt_cursorX | |
self.cursorY = self.alt_cursorY | |
self.toggle_alternate_screen_buffer(alt) | |
def expanded_mode_toggle(self, mode, boolean): | |
""" | |
Meant to be used with (simple) expanded mode settings that merely set or | |
reset attributes for tracking purposes; sets `self.expanded_modes[mode]` | |
to *boolean*. Example usage:: | |
>>> self.expanded_mode_handlers['1000'] = partial(self.expanded_mode_toggle, 'mouse_button_events') | |
""" | |
self.expanded_modes[mode] = boolean | |
def insert_characters(self, n=1): | |
""" | |
Inserts the specified number of characters at the cursor position. | |
Overwriting whatever is already present. | |
""" | |
#logging.debug("insert_characters(%s)" % n) | |
n = int(n) | |
for i in xrange(n): | |
self.screen[self.cursorY].pop() # Take one down, pass it around | |
self.screen[self.cursorY].insert(self.cursorX, u' ') | |
def delete_characters(self, n=1): | |
""" | |
DCH - Deletes (to the left) the specified number of characters at the | |
cursor position. As characters are deleted, the remaining characters | |
between the cursor and right margin move to the left. Character | |
attributes (renditions) move with the characters. The terminal adds | |
blank spaces with no visual character attributes at the right margin. | |
DCH has no effect outside the scrolling margins. | |
.. note:: Deletes renditions too. You'd *think* that would be in one of the VT-* manuals... Nope! | |
""" | |
#logging.debug("delete_characters(%s)" % n) | |
if not n: # e.g. n == '' | |
n = 1 | |
else: | |
n = int(n) | |
for i in xrange(n): | |
try: | |
self.screen[self.cursorY].pop(self.cursorX) | |
self.screen[self.cursorY].append(u' ') | |
self.renditions[self.cursorY].pop(self.cursorX) | |
self.renditions[self.cursorY].append(unichr(1000)) | |
except IndexError: | |
# At edge of screen, ignore | |
#print('IndexError in delete_characters(): %s' % e) | |
pass | |
def _erase_characters(self, n=1): | |
""" | |
Erases (to the right) the specified number of characters at the cursor | |
position. | |
.. note:: Deletes renditions too. | |
""" | |
#logging.debug("_erase_characters(%s)" % n) | |
if not n: # e.g. n == '' | |
n = 1 | |
else: | |
n = int(n) | |
distance = self.cols - self.cursorX | |
n = min(n, distance) | |
for i in xrange(n): | |
self.screen[self.cursorY][self.cursorX+i] = u' ' | |
self.renditions[self.cursorY][self.cursorX+i] = unichr(1000) | |
def cursor_left(self, n=1): | |
"""ESCnD CUB (Cursor Back)""" | |
# Commented out to save CPU (and the others below too) | |
#logging.debug('cursor_left(%s)' % n) | |
n = int(n) | |
# This logic takes care of double-width unicode characters | |
if self.double_width_left: | |
self.double_width_left = False | |
return | |
self.cursorX = max(0, self.cursorX - n) | |
char = self.screen[self.cursorY][self.cursorX] | |
if unicodedata.east_asian_width(char) == 'W': | |
# This lets us skip the next call (get called 2x for 2x width) | |
self.double_width_left = True | |
try: | |
for callback in self.callbacks[CALLBACK_CURSOR_POS].values(): | |
callback() | |
except TypeError: | |
pass | |
def cursor_right(self, n=1): | |
"""ESCnC CUF (Cursor Forward)""" | |
#logging.debug('cursor_right(%s)' % n) | |
if not n: | |
n = 1 | |
n = int(n) | |
# This logic takes care of double-width unicode characters | |
if self.double_width_right: | |
self.double_width_right = False | |
return | |
self.cursorX += n | |
char = self.screen[self.cursorY][self.cursorX] | |
if unicodedata.east_asian_width(char) == 'W': | |
# This lets us skip the next call (get called 2x for 2x width) | |
self.double_width_right = True | |
try: | |
for callback in self.callbacks[CALLBACK_CURSOR_POS].values(): | |
callback() | |
except TypeError: | |
pass | |
def cursor_up(self, n=1): | |
"""ESCnA CUU (Cursor Up)""" | |
#logging.debug('cursor_up(%s)' % n) | |
if not n: | |
n = 1 | |
n = int(n) | |
self.cursorY = max(0, self.cursorY - n) | |
try: | |
for callback in self.callbacks[CALLBACK_CURSOR_POS].values(): | |
callback() | |
except TypeError: | |
pass | |
def cursor_down(self, n=1): | |
"""ESCnB CUD (Cursor Down)""" | |
#logging.debug('cursor_down(%s)' % n) | |
if not n: | |
n = 1 | |
n = int(n) | |
self.cursorY = min(self.rows, self.cursorY + n) | |
try: | |
for callback in self.callbacks[CALLBACK_CURSOR_POS].values(): | |
callback() | |
except TypeError: | |
pass | |
def cursor_next_line(self, n): | |
"""ESCnE CNL (Cursor Next Line)""" | |
#logging.debug("cursor_next_line(%s)" % n) | |
if not n: | |
n = 1 | |
n = int(n) | |
self.cursorY = min(self.rows, self.cursorY + n) | |
self.cursorX = 0 | |
try: | |
for callback in self.callbacks[CALLBACK_CURSOR_POS].values(): | |
callback() | |
except TypeError: | |
pass | |
def cursor_previous_line(self, n): | |
"""ESCnF CPL (Cursor Previous Line)""" | |
#logging.debug("cursor_previous_line(%s)" % n) | |
if not n: | |
n = 1 | |
n = int(n) | |
self.cursorY = max(0, self.cursorY - n) | |
self.cursorX = 0 | |
try: | |
for callback in self.callbacks[CALLBACK_CURSOR_POS].values(): | |
callback() | |
except TypeError: | |
pass | |
def cursor_horizontal_absolute(self, n): | |
"""ESCnG CHA (Cursor Horizontal Absolute)""" | |
if not n: | |
n = 1 | |
n = int(n) | |
self.cursorX = n - 1 # -1 because cols is 0-based | |
try: | |
for callback in self.callbacks[CALLBACK_CURSOR_POS].values(): | |
callback() | |
except TypeError: | |
pass | |
def cursor_position(self, coordinates): | |
""" | |
ESCnH CUP (Cursor Position). Move the cursor to the given coordinates. | |
:coordinates: Should be something like, 'row;col' (1-based) but, 'row', 'row;', and ';col' are also valid (assumes 1 on missing value). | |
.. note:: If coordinates is '' (an empty string), the cursor will be moved to the top left (1;1). | |
""" | |
# NOTE: Since this is 1-based we have to subtract 1 from everything to | |
# match how we store these values internally. | |
if not coordinates: | |
row, col = 0, 0 | |
elif ';' in coordinates: | |
row, col = coordinates.split(';') | |
else: | |
row = coordinates | |
col = 0 | |
try: | |
row = int(row) | |
except ValueError: | |
row = 0 | |
try: | |
col = int(col) | |
except ValueError: | |
col = 0 | |
# These ensure a positive integer while reducing row and col by 1: | |
row = max(0, row - 1) | |
col = max(0, col - 1) | |
self.cursorY = row | |
# The column needs special attention in case there's double-width | |
# characters. | |
double_width = 0 | |
if self.cursorY < self.rows: | |
for i, char in enumerate(self.screen[self.cursorY]): | |
if i == col - double_width: | |
# No need to continue further | |
break | |
if unicodedata.east_asian_width(char) == 'W': | |
double_width += 1 | |
if double_width: | |
col = col - double_width | |
self.cursorX = col | |
try: | |
for callback in self.callbacks[CALLBACK_CURSOR_POS].values(): | |
callback() | |
except TypeError: | |
pass | |
def cursor_position_vertical(self, n): | |
""" | |
Vertical Line Position Absolute (VPA) - Moves the cursor to given line. | |
""" | |
n = int(n) | |
self.cursorY = n - 1 | |
def clear_screen(self): | |
""" | |
Clears the screen. Also used to emulate a terminal reset. | |
.. note:: The current rendition (self.cur_rendition) will be applied to all characters on the screen when this function is called. | |
""" | |
logging.debug('clear_screen()') | |
self.init_screen() | |
self.init_renditions(self.cur_rendition) | |
self.cursorX = 0 | |
self.cursorY = 0 | |
def clear_screen_from_cursor_down(self): | |
""" | |
Clears the screen from the cursor down (ESC[J or ESC[0J). | |
.. note:: This method actually erases from the cursor position to the end of the screen. | |
""" | |
#logging.debug('clear_screen_from_cursor_down()') | |
self.clear_line_from_cursor_right() | |
if self.cursorY == self.rows - 1: | |
# Bottom of screen; nothing to do | |
return | |
self.screen[self.cursorY+1:] = [ | |
array('u', u' ' * self.cols) for a in self.screen[self.cursorY+1:] | |
] | |
c = self.cur_rendition # Just to save space below | |
self.renditions[self.cursorY+1:] = [ | |
array('u', c * self.cols) for a in self.renditions[self.cursorY+1:] | |
] | |
def clear_screen_from_cursor_up(self): | |
""" | |
Clears the screen from the cursor up (ESC[1J). | |
""" | |
#logging.debug('clear_screen_from_cursor_up()') | |
self.screen[:self.cursorY+1] = [ | |
array('u', u' ' * self.cols) for a in self.screen[:self.cursorY] | |
] | |
c = self.cur_rendition | |
self.renditions[:self.cursorY+1] = [ | |
array('u', c * self.cols) for a in self.renditions[:self.cursorY] | |
] | |
self.cursorY = 0 | |
def clear_screen_from_cursor(self, n): | |
""" | |
CSI *n* J ED (Erase Data). This escape sequence uses the following rules: | |
====== ============================= === | |
Esc[J Clear screen from cursor down ED0 | |
Esc[0J Clear screen from cursor down ED0 | |
Esc[1J Clear screen from cursor up ED1 | |
Esc[2J Clear entire screen ED2 | |
====== ============================= === | |
""" | |
#logging.debug('clear_screen_from_cursor(%s)' % n) | |
try: | |
n = int(n) | |
except ValueError: # Esc[J | |
n = 0 | |
clear_types = { | |
0: self.clear_screen_from_cursor_down, | |
1: self.clear_screen_from_cursor_up, | |
2: self.clear_screen | |
} | |
try: | |
clear_types[n]() | |
except KeyError: | |
logging.error(_("Error: Unsupported number for escape sequence J")) | |
# Execute our callbacks | |
try: | |
for callback in self.callbacks[CALLBACK_CHANGED].values(): | |
callback() | |
except TypeError: | |
pass | |
try: | |
for callback in self.callbacks[CALLBACK_CURSOR_POS].values(): | |
callback() | |
except TypeError: | |
pass | |
def clear_line_from_cursor_right(self): | |
""" | |
Clears the screen from the cursor right (ESC[K or ESC[0K). | |
""" | |
#logging.debug("clear_line_from_cursor_right()") | |
saved = self.screen[self.cursorY][:self.cursorX] | |
saved_renditions = self.renditions[self.cursorY][:self.cursorX] | |
spaces = array('u', u' '*len(self.screen[self.cursorY][self.cursorX:])) | |
renditions = array('u', | |
self.cur_rendition * len(self.screen[self.cursorY][self.cursorX:])) | |
self.screen[self.cursorY] = saved + spaces | |
# Reset the cursor position's rendition to the end of the line | |
self.renditions[self.cursorY] = saved_renditions + renditions | |
def clear_line_from_cursor_left(self): | |
""" | |
Clears the screen from the cursor left (ESC[1K). | |
""" | |
#logging.debug("clear_line_from_cursor_left()") | |
saved = self.screen[self.cursorY][self.cursorX:] | |
saved_renditions = self.renditions[self.cursorY][self.cursorX:] | |
spaces = array('u', u' '*len(self.screen[self.cursorY][:self.cursorX])) | |
renditions = array('u', | |
self.cur_rendition * len(self.screen[self.cursorY][:self.cursorX])) | |
self.screen[self.cursorY] = spaces + saved | |
self.renditions[self.cursorY] = renditions + saved_renditions | |
def clear_line(self): | |
""" | |
Clears the entire line (ESC[2K). | |
""" | |
#logging.debug("clear_line()") | |
self.screen[self.cursorY] = array('u', u' ' * self.cols) | |
c = self.cur_rendition | |
self.renditions[self.cursorY] = array('u', c * self.cols) | |
self.cursorX = 0 | |
def clear_line_from_cursor(self, n): | |
""" | |
CSI*n*K EL (Erase in Line). This escape sequence uses the following | |
rules: | |
====== ============================== === | |
Esc[K Clear screen from cursor right EL0 | |
Esc[0K Clear screen from cursor right EL0 | |
Esc[1K Clear screen from cursor left EL1 | |
Esc[2K Clear entire line ED2 | |
====== ============================== === | |
""" | |
#logging.debug('clear_line_from_cursor(%s)' % n) | |
try: | |
n = int(n) | |
except ValueError: # Esc[J | |
n = 0 | |
clear_types = { | |
0: self.clear_line_from_cursor_right, | |
1: self.clear_line_from_cursor_left, | |
2: self.clear_line | |
} | |
try: | |
clear_types[n]() | |
except KeyError: | |
logging.error(_( | |
"Error: Unsupported number for CSI escape sequence K")) | |
# Execute our callbacks | |
try: | |
for callback in self.callbacks[CALLBACK_CHANGED].values(): | |
callback() | |
except TypeError: | |
pass | |
try: | |
for callback in self.callbacks[CALLBACK_CURSOR_POS].values(): | |
callback() | |
except TypeError: | |
pass | |
def set_led_state(self, n): | |
""" | |
Sets the values the dict, self.leds depending on *n* using the following | |
rules: | |
====== ====================== ====== | |
Esc[0q Turn off all four leds DECLL0 | |
Esc[1q Turn on LED #1 DECLL1 | |
Esc[2q Turn on LED #2 DECLL2 | |
Esc[3q Turn on LED #3 DECLL3 | |
Esc[4q Turn on LED #4 DECLL4 | |
====== ====================== ====== | |
.. note:: These aren't implemented in Gate One's GUI (yet) but they certainly kept track of! | |
""" | |
logging.debug("set_led_state(%s)" % n) | |
leds = n.split(';') | |
for led in leds: | |
led = int(led) | |
if led == 0: | |
self.leds[1] = False | |
self.leds[2] = False | |
self.leds[3] = False | |
self.leds[4] = False | |
else: | |
self.leds[led] = True | |
try: | |
for callback in self.callbacks[CALLBACK_LEDS].values(): | |
callback(led) | |
except TypeError: | |
pass | |
def _set_rendition(self, n): | |
""" | |
Sets :attr:`self.renditions[self.cursorY][self.cursorX]` equal to | |
*n.split(';')*. | |
*n* is expected to be a string of ECMA-48 rendition numbers separated by | |
semicolons. Example:: | |
'0;1;31' | |
...will result in:: | |
[0, 1, 31] | |
Note that the numbers were converted to integers and the order was | |
preserved. | |
""" | |
#logging.debug("_set_rendition(%s)" % n) | |
cursorY = self.cursorY | |
cursorX = self.cursorX | |
if cursorX >= self.cols: # We're at the end of the row | |
try: | |
if len(self.renditions[cursorY]) <= cursorX: | |
# Make it all longer | |
self.renditions[cursorY].append(u' ') # Make it longer | |
self.screen[cursorY].append(u'\x00') # This needs to match | |
except IndexError: | |
# This can happen if the rate limiter kicks in and starts | |
# cutting off escape sequences at random. | |
return # Don't bother attempting to process anything else | |
if cursorY >= self.rows: | |
logging.error(_( | |
"cursorY >= self.rows! This is either a bug or just a symptom " | |
"of the rate limiter kicking in.")) | |
return # Don't bother setting renditions past the bottom | |
if not n: # or \x1b[m (reset) | |
# First char in PUA Plane 16 is always the default: | |
self.cur_rendition = unichr(1000) # Should be reset (e.g. [0]) | |
return # No need for further processing; save some CPU | |
# Convert the string (e.g. '0;1;32') to a list (e.g. [0,1,32] | |
new_renditions = [int(a) for a in n.split(';') if a != ''] | |
# Handle 256-color renditions by getting rid of the (38|48);5 part and | |
# incrementing foregrounds by 1000 and backgrounds by 10000 so we can | |
# tell them apart in _spanify_screen(). | |
try: | |
if 38 in new_renditions: | |
foreground_index = new_renditions.index(38) | |
if len(new_renditions[foreground_index:]) >= 2: | |
if new_renditions[foreground_index+1] == 5: | |
# This is a valid 256-color rendition (38;5;<num>) | |
new_renditions.pop(foreground_index) # Goodbye 38 | |
new_renditions.pop(foreground_index) # Goodbye 5 | |
new_renditions[foreground_index] += 1000 | |
if 48 in new_renditions: | |
background_index = new_renditions.index(48) | |
if len(new_renditions[background_index:]) >= 2: | |
if new_renditions[background_index+1] == 5: | |
# This is a valid 256-color rendition (48;5;<num>) | |
new_renditions.pop(background_index) # Goodbye 48 | |
new_renditions.pop(background_index) # Goodbye 5 | |
new_renditions[background_index] += 10000 | |
except IndexError: | |
# Likely that the rate limiter has caused all sorts of havoc with | |
# escape sequences. Just ignore it and halt further processing | |
return | |
out_renditions = [] | |
for rend in new_renditions: | |
if rend == 0: | |
out_renditions = [0] | |
else: | |
out_renditions.append(rend) | |
if out_renditions[0] == 0: | |
# If it starts with 0 there's no need to combine it with the | |
# previous rendition... | |
reduced = _reduce_renditions(out_renditions) | |
if reduced not in self.renditions_store.values(): | |
new_ref_point = self.rend_counter.next() | |
self.renditions_store.update({new_ref_point: reduced}) | |
self.cur_rendition = new_ref_point | |
else: # Find the right reference point to use | |
for k, v in self.renditions_store.items(): | |
if reduced == v: | |
self.cur_rendition = k | |
return | |
new_renditions = out_renditions | |
cur_rendition_list = self.renditions_store[self.cur_rendition] | |
reduced = _reduce_renditions(cur_rendition_list + new_renditions) | |
if reduced not in self.renditions_store.values(): | |
new_ref_point = self.rend_counter.next() | |
self.renditions_store.update({new_ref_point: reduced}) | |
self.cur_rendition = new_ref_point | |
else: # Find the right reference point to use | |
for k, v in self.renditions_store.items(): | |
if reduced == v: | |
self.cur_rendition = k | |
def _opt_handler(self, chars): | |
""" | |
Optional special escape sequence handler for sequences matching | |
RE_OPT_SEQ. If CALLBACK_OPT is defined it will be called like so:: | |
self.callbacks[CALLBACK_OPT](chars) | |
Applications can use this escape sequence to define whatever special | |
handlers they like. It works like this: If an escape sequence is | |
encountered matching RE_OPT_SEQ this method will be called with the | |
inbetween *chars* (e.g. \x1b]_;<chars>\x07) as the argument. | |
Applications can then do what they wish with *chars*. | |
.. note:: I added this functionality so that plugin authors would have a mechanism to communicate with terminal applications. See the SSH plugin for an example of how this can be done (there's channels of communication amongst ssh_connect.py, ssh.js, and ssh.py). | |
""" | |
try: | |
for callback in self.callbacks[CALLBACK_OPT].values(): | |
callback(chars) | |
except TypeError as e: | |
# High likelyhood that nothing is defined. No biggie. | |
pass | |
def _spanify_screen(self): | |
""" | |
Iterates over the lines in *screen* and *renditions*, applying HTML | |
markup (span tags) where appropriate and returns the result as a list of | |
lines. It also marks the cursor position via a <span> tag at the | |
appropriate location. | |
""" | |
#logging.debug("_spanify_screen()") | |
results = [] | |
# NOTE: Why these duplicates of self.* and globals? Local variable | |
# lookups are faster--especially in loops. | |
special = SPECIAL | |
rendition_classes = RENDITION_CLASSES | |
screen = self.screen | |
renditions = self.renditions | |
renditions_store = self.renditions_store | |
cursorX = self.cursorX | |
cursorY = self.cursorY | |
show_cursor = self.expanded_modes['25'] | |
if len(self.prev_dump) != len(screen): | |
# Fix it to be equal--assume first time/screen reset/resize/etc | |
# Just fill it with empty strings (only the length matters here) | |
self.prev_dump = [[] for a in screen] | |
self.prev_dump_rend = [[] for a in screen] | |
# The html_cache may need to be fixed as well | |
if len(self.html_cache) != len(screen): | |
self.html_cache = [u'' for a in screen] # Essentially a reset | |
spancount = 0 | |
current_classes = set() | |
prev_rendition = None | |
foregrounds = ('f0','f1','f2','f3','f4','f5','f6','f7') | |
backgrounds = ('b0','b1','b2','b3','b4','b5','b6','b7') | |
html_entities = {"&": "&", '<': '<', '>': '>'} | |
for linecount, line_rendition in enumerate(izip(screen, renditions)): | |
line = line_rendition[0] | |
rendition = line_rendition[1] | |
if linecount != cursorY and self.prev_dump[linecount] == line: | |
if '<span class="cursor">' not in self.html_cache[linecount]: | |
if self.prev_dump_rend[linecount] == rendition: | |
# No change since the last dump. Use the cache... | |
results.append(self.html_cache[linecount]) | |
continue # Nothing changed so move on to the next line | |
outline = "" | |
if current_classes: | |
outline += '<span class="%s">' % " ".join(current_classes) | |
charcount = 0 | |
# TODO: Figure out if there's a faster way to process each character | |
for char, rend in izip(line, rendition): | |
rend = renditions_store[rend] # Get actual rendition | |
if ord(char) >= special: # Special stuff =) | |
# Obviously, not really a single character | |
if char in self.captured_files: | |
outline += self.captured_files[char].html() | |
continue | |
changed = True | |
if char in "&<>": | |
# Have to convert ampersands and lt/gt to HTML entities | |
char = html_entities[char] | |
if rend == prev_rendition: | |
# Shortcut... So we can skip all the logic below | |
changed = False | |
else: | |
prev_rendition = rend | |
if changed and rend: | |
classes = imap(rendition_classes.get, rend) | |
for _class in classes: | |
if _class and _class not in current_classes: | |
# Something changed... Start a new span | |
if spancount: | |
outline += "</span>" | |
spancount -= 1 | |
if 'reset' in _class: | |
if _class == 'reset': | |
current_classes = set() | |
if spancount: | |
for i in xrange(spancount): | |
outline += "</span>" | |
spancount = 0 | |
else: | |
reset_class = _class.split('reset')[0] | |
if reset_class == 'foreground': | |
# Remove any foreground classes | |
[current_classes.pop(i) for i, a in | |
enumerate(current_classes) if a in | |
foregrounds | |
] | |
elif reset_class == 'background': | |
[current_classes.pop(i) for i, a in | |
enumerate(current_classes) if a in | |
backgrounds | |
] | |
else: | |
try: | |
current_classes.remove(reset_class) | |
except KeyError: | |
# Trying to reset something that was | |
# never set. Ignore | |
pass | |
else: | |
if _class in foregrounds: | |
[current_classes.pop(i) for i, a in | |
enumerate(current_classes) if a in | |
foregrounds | |
] | |
elif _class in backgrounds: | |
[current_classes.pop(i) for i, a in | |
enumerate(current_classes) if a in | |
backgrounds | |
] | |
current_classes.add(_class) | |
if current_classes: | |
outline += '<span class="%s">' % " ".join(current_classes) | |
spancount += 1 | |
if linecount == cursorY and charcount == cursorX: # Cursor position | |
if show_cursor: | |
outline += '<span class="cursor">%s</span>' % char | |
else: | |
outline += char | |
else: | |
outline += char | |
charcount += 1 | |
self.prev_dump[linecount] = line[:] | |
self.prev_dump_rend[linecount] = rendition[:] | |
if outline: | |
# Make sure all renditions terminate at the end of the line | |
for whatever in xrange(spancount): | |
outline += "</span>" | |
results.append(outline) | |
self.html_cache[linecount] = outline | |
else: | |
results.append(None) # 'null' is shorter than 4 spaces | |
self.html_cache[linecount] = None | |
# NOTE: The client has been programmed to treat None (aka null in | |
# JavaScript) as blank lines. | |
for whatever in xrange(spancount): # Bit of cleanup to be safe | |
results[-1] += "</span>" | |
return results | |
def _spanify_scrollback(self): | |
""" | |
Spanifies (turns renditions into `<span>` elements) everything inside | |
`self.scrollback` using `self.renditions`. This differs from | |
`_spanify_screen` in that it doesn't apply any logic to detect the | |
location of the cursor (to make it just a tiny bit faster). | |
""" | |
# NOTE: See the comments in _spanify_screen() for details on this logic | |
results = [] | |
special = SPECIAL | |
screen = self.scrollback_buf | |
renditions = self.scrollback_renditions | |
rendition_classes = RENDITION_CLASSES | |
renditions_store = self.renditions_store | |
spancount = 0 | |
current_classes = set() | |
prev_rendition = None | |
foregrounds = ('f0','f1','f2','f3','f4','f5','f6','f7') | |
backgrounds = ('b0','b1','b2','b3','b4','b5','b6','b7') | |
html_entities = {"&": "&", '<': '<', '>': '>'} | |
for line, rendition in izip(screen, renditions): | |
outline = "" | |
if current_classes: | |
outline += '<span class="%s">' % " ".join(current_classes) | |
for char, rend in izip(line, rendition): | |
rend = renditions_store[rend] # Get actual rendition | |
if ord(char) >= special: # Special stuff =) | |
# Obviously, not really a single character | |
if char in self.captured_files: | |
outline += self.captured_files[char].html() | |
continue | |
changed = True | |
if char in "&<>": | |
# Have to convert ampersands and lt/gt to HTML entities | |
char = html_entities[char] | |
if rend == prev_rendition: | |
changed = False | |
else: | |
prev_rendition = rend | |
if changed and rend != None: | |
classes = imap(rendition_classes.get, rend) | |
for _class in classes: | |
if _class and _class not in current_classes: | |
if spancount: | |
outline += "</span>" | |
spancount -= 1 | |
if 'reset' in _class: | |
if _class == 'reset': | |
current_classes = set() | |
else: | |
reset_class = _class.split('reset')[0] | |
if reset_class == 'foreground': | |
[current_classes.pop(i) for i, a in | |
enumerate(current_classes) if a in | |
foregrounds | |
] | |
elif reset_class == 'background': | |
[current_classes.pop(i) for i, a in | |
enumerate(current_classes) if a in | |
backgrounds | |
] | |
else: | |
try: | |
current_classes.remove(reset_class) | |
except KeyError: | |
pass | |
else: | |
if _class in foregrounds: | |
[current_classes.pop(i) for i, a in | |
enumerate(current_classes) if a in | |
foregrounds | |
] | |
elif _class in backgrounds: | |
[current_classes.pop(i) for i, a in | |
enumerate(current_classes) if a in | |
backgrounds | |
] | |
current_classes.add(_class) | |
if current_classes: | |
outline += '<span class="%s">' % " ".join(current_classes) | |
spancount += 1 | |
outline += char | |
if outline: | |
# Make sure all renditions terminate at the end of the line | |
for whatever in xrange(spancount): | |
outline += "</span>" | |
results.append(outline) | |
else: | |
results.append(None) | |
for whatever in xrange(spancount): # Bit of cleanup to be safe | |
results[-1] += "</span>" | |
return results | |
def dump_html(self, renditions=True): | |
""" | |
Dumps the terminal screen as a list of HTML-formatted lines. If | |
*renditions* is True (default) then terminal renditions will be | |
converted into HTML <span> elements so they will be displayed properly | |
in a browser. Otherwise only the cursor <span> will be added to mark | |
its location. | |
.. note:: This places <span class="cursor">(current character)</span> around the cursor location. | |
""" | |
if renditions: # i.e. Use stylized text (the default) | |
screen = self._spanify_screen() | |
scrollback = [] | |
if self.scrollback_buf: | |
scrollback = self._spanify_scrollback() | |
else: | |
cursorX = self.cursorX | |
cursorY = self.cursorY | |
screen = [] | |
for y, row in enumerate(self.screen): | |
if y == cursorY: | |
cursor_row = "" | |
for x, c in enumerate(row): | |
if x == cursorX: | |
cursor_row += '<span class="cursor">%s</span>' % c | |
else: | |
cursor_row += char | |
screen.append(cursor_row) | |
else: | |
screen.append("".join(row)) | |
# Empty the scrollback buffer: | |
self.init_scrollback() | |
self.modified = False | |
return (scrollback, screen) | |
def dump_plain(self): | |
""" | |
Dumps the screen and the scrollback buffer as-is then empties the | |
scrollback buffer. | |
""" | |
screen = self.screen | |
scrollback = self.scrollback_buf | |
# Empty the scrollback buffer: | |
self.init_scrollback() | |
self.modified = False | |
return (scrollback, screen) | |
def dump_components(self): | |
""" | |
Dumps the screen and renditions as-is, the scrollback buffer as HTML, | |
and the current cursor coordinates. Also, empties the scrollback buffer | |
.. note:: This was used in some performance-related experiments but might be useful for other patterns in the future so I've left it here. | |
""" | |
screen = [a.tounicode() for a in self.screen] | |
scrollback = [] | |
if self.scrollback_buf: | |
# Process the scrollback buffer into HTML | |
scrollback = self._spanify_scrollback( | |
self.scrollback_buf, self.scrollback_renditions) | |
# Empty the scrollback buffer: | |
self.init_scrollback() | |
self.modified = False | |
return (scrollback, screen, self.renditions, self.cursorY, self.cursorX) | |
def dump(self): | |
""" | |
Returns self.screen as a list of strings with no formatting. | |
No scrollback buffer. No renditions. It is meant to be used to get a | |
quick glance of what is being displayed (when debugging). | |
.. note:: This method does not empty the scrollback buffer. | |
""" | |
out = [] | |
for line in self.screen: | |
line_out = "" | |
for char in line: | |
if len(char) > 1: # This is an image (or similar) | |
line_out += u'⬚' # Use a dotted square as a placeholder | |
else: | |
line_out += char | |
out.append(line_out) | |
self.modified = False | |
return out | |
# This is here to make it easier for someone to produce an HTML app that uses | |
# terminal.py | |
def css_renditions(selector=None): | |
""" | |
Returns a (long) string containing all the CSS styles in order to support | |
terminal text renditions (different colors, bold, etc) in an HTML terminal | |
using the dump_html() function. If *selector* is provided, all styles will | |
be prefixed with said selector like so:: | |
${selector} span.f0 { color: #5C5C5C; } | |
Example:: | |
>>> css_renditions("#gateone").splitlines()[7] | |
'#gateone span.f0 { color: #5C5C5C; } /* Black */' | |
""" | |
from string import Template | |
# Try looking for the fallback CSS template in two locations: | |
# * The same directory that holds terminal.py | |
# * A 'templates' directory in the same location as terminal.py | |
template_name = 'terminal_renditions_fallback.css' | |
template_path = os.path.join(os.path.split(__file__)[0], template_name) | |
if not os.path.exists(template_path): | |
# Try looking in a 'templates' directory | |
template_path = os.path.join( | |
os.path.split(__file__)[0], 'templates', template_name) | |
if not os.path.exists(template_path): | |
raise IOError("File not found: %s" % template_name) | |
with open(template_path) as f: | |
css = f.read() | |
renditions_template = Template(css) | |
return renditions_template.substitute(selector=selector) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment