Last active
August 29, 2015 13:56
-
-
Save stuaxo/9300565 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Superceded by this: https://gist.github.com/stuaxo/9308767 | |
import Queue | |
import sys | |
import time | |
import select | |
import tty | |
import termios | |
from threading import Thread | |
""" | |
Non blocking readline implementation. | |
Based on some non blocking code I found on the net (TODO, refind - attribute) | |
Kind of horrible workaround to reimplement readline as nonblocking | |
commands: | |
hello - test command | |
cancel - tell the other thread to quit, then we automatically die | |
Use case: | |
main thread runs cmdline | |
secondary thread can run a GUI mainloop, | |
when that quits it can make the main thread quit, | |
by sending it a message and aborting any running readlines. | |
""" | |
EOF='\x04' | |
BACKSPACE='\x7f' | |
#check n terminate program on terminal condition, | |
#from a separate thread | |
class SelectHelper(Thread): | |
#check stdin for input... | |
def __init__(self, char_q = None, cancel_key=None, enter_key='\n', echo_dev=sys.stdout): | |
self.collect = True | |
self.cancel_key = None | |
self.enter_key = enter_key | |
self.open = True | |
if char_q is None: | |
self.char_q = Queue.Queue() | |
else: | |
self.char_q = char_q | |
self.echo_dev = echo_dev | |
Thread.__init__(self) | |
def has_data(self): | |
return select.select([sys.stdin], [], [], 0) == ([sys.stdin], [], []) | |
def run(self): | |
old_settings = termios.tcgetattr(sys.stdin) | |
try: | |
tty.setcbreak(sys.stdin.fileno()) | |
while self.collect: | |
if self.has_data(): | |
ch = sys.stdin.read(1) | |
self.char_q.put(ch) | |
if self.echo_dev is not None and \ | |
ch not in [self.enter_key, self.cancel_key, EOF] and \ | |
ord(ch) > 10: | |
if ch == BACKSPACE: | |
self.echo_dev.write('\b \b') | |
else: | |
self.echo_dev.write(ch) | |
self.echo_dev.flush() | |
elif ch in [self.enter_key, self.cancel_key, EOF]: | |
if self.echo_dev is not None: | |
self.echo_dev.write('\n') | |
self.echo_dev.flush() | |
pass | |
self.collect = False | |
break | |
else: | |
time.sleep(0.1) | |
finally: | |
self.collect = False | |
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) | |
def stop(self): | |
self.collect = False | |
def close(self): | |
self.stop() | |
self.open = False | |
class StdinCancellable(object): | |
def __init__(self, wait=0.1): | |
self.wait = wait | |
self.open = True | |
self.input_thread_timeout=0.01 | |
def readline(self): | |
if self.open is False: | |
return '' | |
self.char_q = Queue.Queue() | |
char_q = self.char_q | |
self.t = SelectHelper(char_q = self.char_q, cancel_key=EOF) | |
if not self.t.is_alive(): | |
self.t.start() | |
self.t.join(self.input_thread_timeout) | |
line = '' | |
while self.t.collect or char_q.empty() == False: | |
try: | |
while char_q.empty() == False: | |
ch = char_q.get(block=False) | |
if ch == EOF: | |
self.t.collect = False | |
return '' | |
if ch == BACKSPACE: | |
line = line[:len(line)-1] | |
elif ch is not None: | |
line += ch | |
time.sleep(self.wait) | |
except Queue.Empty: | |
time.sleep(self.wait) | |
except KeyboardInterrupt as k: | |
self.t.collect = False | |
raise k | |
return line + '\n' | |
def cancel(self): | |
self.t.close() | |
self.open = False | |
s = StdinCancellable() | |
import cmd | |
class Test(cmd.Cmd): | |
def __init__(self, s=None, **kwargs): | |
cmd.Cmd.__init__(self, **kwargs) | |
if s is not None: | |
self.stdin = s | |
self.use_rawinput=False | |
def do_cancel(self, line): | |
self.stdin.cancel() | |
def do_hello(self, line): | |
print 'hello ', line | |
def do_EOF(self, line): | |
return True | |
#t = Test() | |
t = Test(stdin=s) | |
t.cmdloop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment