Skip to content

Instantly share code, notes, and snippets.

@stuaxo
Last active August 29, 2015 13:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save stuaxo/9300565 to your computer and use it in GitHub Desktop.
Save stuaxo/9300565 to your computer and use it in GitHub Desktop.
# Superceded by this: https://gist.github.com/stuaxo/9308767
import Queue
import sys
import time
import select
import tty
import termios
from threading import Thread
"""
Non blocking readline implementation.
Based on some non blocking code I found on the net (TODO, refind - attribute)
Kind of horrible workaround to reimplement readline as nonblocking
commands:
hello - test command
cancel - tell the other thread to quit, then we automatically die
Use case:
main thread runs cmdline
secondary thread can run a GUI mainloop,
when that quits it can make the main thread quit,
by sending it a message and aborting any running readlines.
"""
EOF='\x04'
BACKSPACE='\x7f'
#check n terminate program on terminal condition,
#from a separate thread
class SelectHelper(Thread):
#check stdin for input...
def __init__(self, char_q = None, cancel_key=None, enter_key='\n', echo_dev=sys.stdout):
self.collect = True
self.cancel_key = None
self.enter_key = enter_key
self.open = True
if char_q is None:
self.char_q = Queue.Queue()
else:
self.char_q = char_q
self.echo_dev = echo_dev
Thread.__init__(self)
def has_data(self):
return select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], [])
def run(self):
old_settings = termios.tcgetattr(sys.stdin)
try:
tty.setcbreak(sys.stdin.fileno())
while self.collect:
if self.has_data():
ch = sys.stdin.read(1)
self.char_q.put(ch)
if self.echo_dev is not None and \
ch not in [self.enter_key, self.cancel_key, EOF] and \
ord(ch) > 10:
if ch == BACKSPACE:
self.echo_dev.write('\b \b')
else:
self.echo_dev.write(ch)
self.echo_dev.flush()
elif ch in [self.enter_key, self.cancel_key, EOF]:
if self.echo_dev is not None:
self.echo_dev.write('\n')
self.echo_dev.flush()
pass
self.collect = False
break
else:
time.sleep(0.1)
finally:
self.collect = False
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings)
def stop(self):
self.collect = False
def close(self):
self.stop()
self.open = False
class StdinCancellable(object):
def __init__(self, wait=0.1):
self.wait = wait
self.open = True
self.input_thread_timeout=0.01
def readline(self):
if self.open is False:
return ''
self.char_q = Queue.Queue()
char_q = self.char_q
self.t = SelectHelper(char_q = self.char_q, cancel_key=EOF)
if not self.t.is_alive():
self.t.start()
self.t.join(self.input_thread_timeout)
line = ''
while self.t.collect or char_q.empty() == False:
try:
while char_q.empty() == False:
ch = char_q.get(block=False)
if ch == EOF:
self.t.collect = False
return ''
if ch == BACKSPACE:
line = line[:len(line)-1]
elif ch is not None:
line += ch
time.sleep(self.wait)
except Queue.Empty:
time.sleep(self.wait)
except KeyboardInterrupt as k:
self.t.collect = False
raise k
return line + '\n'
def cancel(self):
self.t.close()
self.open = False
s = StdinCancellable()
import cmd
class Test(cmd.Cmd):
def __init__(self, s=None, **kwargs):
cmd.Cmd.__init__(self, **kwargs)
if s is not None:
self.stdin = s
self.use_rawinput=False
def do_cancel(self, line):
self.stdin.cancel()
def do_hello(self, line):
print 'hello ', line
def do_EOF(self, line):
return True
#t = Test()
t = Test(stdin=s)
t.cmdloop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment