Created
May 2, 2017 05:22
-
-
Save n3fariox/5e09bbbdb8c0293a1651b36e5ea8e16a to your computer and use it in GitHub Desktop.
Non-Blocking Cmd w/Readline Completion
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cmd | |
import readline | |
import select | |
import sys | |
import threading | |
import time | |
def setup_readline(): | |
import ctypes | |
from ctypes.util import find_library | |
rl_lib = ctypes.cdll.LoadLibrary(find_library("readline")) | |
readline.callback_handler_remove = rl_lib.rl_callback_handler_remove | |
readline.callback_read_char = rl_lib.rl_callback_read_char | |
rlcallbackfunctype = ctypes.CFUNCTYPE(ctypes.c_void_p, ctypes.c_char_p) | |
def setcallbackfunc(prompt, thefunc): | |
rl_lib.rl_callback_handler_install(prompt, rlcallbackfunctype(thefunc)) | |
readline.callback_handler_install = lambda prompt, cfunc: ( | |
rl_lib.rl_callback_handler_install(prompt, rlcallbackfunctype(cfunc))) | |
setup_readline() | |
class CIN(): | |
def __init__(self): | |
self.shutdown = False | |
self.interrupt = False | |
def readline(self): | |
ctx = {'line': None} | |
def line_hack(line): | |
ctx['line'] = "EOF" if line is None else line | |
try: | |
readline.callback_handler_install("", line_hack) | |
while not ctx['line']: | |
try: | |
r, w, x = select.select([sys.stdin], [], [], .1) | |
except select.error as err: | |
if err[0] == 4: | |
continue | |
raise err | |
except KeyboardInterrupt: | |
ctx['line'] = '\n' | |
print('') | |
break | |
if sys.stdin in r: | |
readline.callback_read_char() | |
if self.shutdown or self.interrupt: | |
break | |
if self.interrupt: | |
line = "interrupt" | |
self.interrupt = False | |
elif self.shutdown: | |
line = "shutdown" | |
self.shutdown = False | |
else: | |
line = ctx['line'] | |
finally: | |
readline.callback_handler_remove() | |
return line | |
class MyCmd(cmd.Cmd): | |
prompt = "CP>" | |
use_rawinput = False | |
def __init__(self, *args, **kwargs): | |
cmd.Cmd.__init__(self, *args, **kwargs) | |
def cmdloop(self, **kwargs): | |
if self.completekey: | |
try: | |
import readline | |
self.old_completer = readline.get_completer() | |
readline.set_completer(self.complete) | |
readline.parse_and_bind(self.completekey + ": complete") | |
except ImportError: | |
pass | |
cmd.Cmd.cmdloop(self, **kwargs) | |
try: | |
import readline | |
readline.set_completer(self.old_completer) | |
except ImportError: | |
pass | |
def do_interrupt(self, line): | |
print("Interrupting goat!") | |
def do_shutdown(self, line): | |
print("Shutting down") | |
return True | |
def do_quit(self, line): | |
print("Quitting") | |
return True | |
def do_EOF(self, line): | |
print("Bye!!") | |
return True | |
def shutdown_timer(cin): | |
time.sleep(10) | |
cin.interrupt = True | |
time.sleep(20) | |
cin.shutdown = True | |
if __name__ == "__main__": | |
cin = CIN() | |
my_cmd = MyCmd(stdin=cin) | |
killer = threading.Thread(target=shutdown_timer, args=(cin,)) | |
killer.daemon = True | |
killer.start() | |
my_cmd.cmdloop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment