Skip to content

Instantly share code, notes, and snippets.

@bebraw
Created January 21, 2010 17:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bebraw/282976 to your computer and use it in GitHub Desktop.
Save bebraw/282976 to your computer and use it in GitHub Desktop.
from file import PluginDirectory
from interpreter import Interpreter
from plugin_loader import PluginLoader
from threads import InputThread, Poller
class KeyboardInput(InputThread):
def get_data(self):
return raw_input('>>> ')
class Application:
input_source = KeyboardInput
def run(self):
plugin_loader = PluginLoader()
plugin_directory = PluginDirectory()
commands = plugin_loader.load(plugin_directory)
self.interpreter = Interpreter(commands)
poller = Poller(self.input_source, self.input_evaluator)
try:
poller.poll()
except SystemExit:
pass
def input_evaluator(self, user_input):
if user_input is None:
user_input = self.input()
result = self.interpreter.interpret(user_input)
if result is not None:
if isinstance(result, str):
lines = result.split('\n')
if len(lines) > 1:
for line in lines:
self.output(line)
else:
self.output(result)
else:
self.output(result)
def output(self, result):
print result
from mock import patch
from placidity.application import Application
from placidity.scenario_tester import ScenarioTester
from placidity.threads import InputThread
class InputTester(InputThread):
def get_data(self):
return None
class ApplicationTester(Application):
input_source = InputTester
class TestApplication:
scenario_tester = ScenarioTester(ApplicationTester)
...
import threading
class Poller:
def __init__(self, thread_class, interpreter):
assert hasattr(interpreter, '__call__')
self.thread_class = thread_class
self.interpreter = interpreter
def poll(self):
self._set_up_thread()
while True:
while not self.thread.data_ready.isSet():
pass
try:
self.interpreter(self.thread.data)
except SystemExit:
break
else:
self._set_up_thread()
def _set_up_thread(self):
self.thread = self.thread_class()
self.thread.start()
class InputThread(threading.Thread):
def __init__(self):
self.data_ready = threading.Event()
super(InputThread, self).__init__()
def run(self):
self.data = self.get_data()
self.data_ready.set()
def get_data(self):
pass
class RepeatingTimer:
def __init__(self, interval, callback):
def repeat_callback():
callback()
if self.running:
self._timer = self._create_timer()
self._timer.start()
self.interval = interval
self.repeat_callback = repeat_callback
def start(self):
self._timer = self._create_timer()
self.running = True
self._timer.start()
def _create_timer(self):
timer = threading.Timer(self.interval, self.repeat_callback)
# make sure thread gets killed on app quit
timer.daemon = True
return timer
def cancel(self):
self.running = False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment