public
Last active

Reactor Framework

  • Download Gist
reactor.py
Python
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349
"""The reactor framework.
 
This module introduces the *reactor framework*, a collection of utilities to be
used in conjunction with the greelet library to solve the problem of inversion
of control in event-driven code.
 
Traditionally, writing event-driven code typically consists of "connecting"
signals to handlers (i.e. callbacks), which are to be invoked by the framework
in use when a certain "event" occurs.
 
As long as the logic being implemented is simple enough to be easily expressed
in the form of "reactions" to specified events, this model is pretty natural
and straightforward to implement. An example is an application displaying
several buttons, each of which performs a different non-interactive operation.
 
However, more often than not, the desired reaction to user input depends not
only on the input itself, but also on whatever input was already provided,
either by the user, or by some other previous operations that is now concluded.
 
In that case, the logic detailing the desired behavior can usually be easily
expressed in a linear fashion, but that doesn't directly translate into the
traditional event-driven model, and it often requires a translation into the
equivalent of a state machine representation or something to that effect, to be
able to be implemented correctly and reliably.
 
For example, suppose we want to implement a button which will display a message
only after it has been clicked 5 times, and do nothing otherwise.
 
In the usual event-driven style of programming, we need to introduce a global
state containing the number of times the button has been clicked, increment it,
and check if it reached the desired count at each invocation of the handler.
Using the Qt API as an example, the handler and initialization function would
probably look like this:
 
def __init__(self):
self.count = 0
self.button.clicked.connect(self.on_button_clicked)
 
def on_button_clicked(self):
self.count += 1
if self.count == 5:
print "time is a face on the water"
self.button.clicked.disconnect(on_button_clicked)
 
This is a simple enough example that it doesn't look too bad, but still it's
easy to spot the signs that make such code so hard to follow and maintain when
the complexity is increased even so slightly.
 
It has all the components of a straightforward for loop (initializing a
counter, incrementing it at each step, checking for termination), but the
iteration itself is hidden in the framework main loop, and we cannot just use
the normal python iteration techniques, because the execution of the loop is
not contained within a single function frame, but is spread throughout multiple
function invocations.
 
The reactor framework solves this problem with the use of greenlet "threads".
When you use wait_for to connect to a signal, for example, control is
immediately returned to the calling function, but resumed to the point where it
left as soon as the signal is emitted.
 
This makes the code look like a blocking wait on the signal but without any of
the disadvantages, like actually blocking the main loop, resulting in a frozen
GUI.
 
For comparison, here is how the previous example could have been implemented
using the reactor framework:
 
def __init__(self):
for i in xrange(5):
wait_for(self.button.clicked)
print "time is a face on the water"
 
The reactor framework is inspired by similar libraries available for other
languages and frameworks [1] [2], as well as a number of research articles on
the subject of solving the inversion of control problem with cooperative
multitasking [3] [4].
 
References:
 
[1] http://lamp.epfl.ch/~imaier/
[2] http://thesynchronousblog.files.wordpress.com/2009/08/luagravity_sblp.pdf
[3] http://lamp.epfl.ch/~phaller/doc/haller06jmlc.pdf
[4] http://www.stanford.edu/class/cs240/readings/usenix2002-fibers.pdf
"""
 
from greenlet import greenlet, getcurrent as gself
from decorator import decorator
 
def react(f):
"""Run the given function f in a new greenlet.
 
Convenience function to start a new reactive context.
 
Since wait_for or reactive_handler cannot be used inside the same greenlet
as the main loop, a basic use of this function is to execute the
application main routine, so that it run in its own greenlet, and is able
to take advantage of reactive event handling.
 
More generally, spawning a new greenlet is useful to make sure handling of
certain signals is performed independently of others.
 
For example, a code like the following:
 
wait_for(button1.clicked)
print "button1 clicked"
 
wait_for(button2.clicked)
print "button2 clicked"
 
will only react to presses of button1 and button2 in that specific order, whereas
 
wait_for(button1.clicked, button2.clicked)
print "button1 or button2 clicked"
 
will react equally to a click on either of the buttons. If we want to react
to buttons in either order, we can use something like the following:
 
@reactive
def handle_button(button, name):
wait_for(button.clicked)
print name, "clicked"
 
handle_button(button1, "button1")
handle_button(button2, "button2")
 
"""
greenlet(f).switch()
 
@decorator
def reactive(f, *args):
"""Modify a function to spawn a new greenlet when called.
 
Whenever the wrapped function is called, a new greenlet is spawned and the
function is executed in it.
 
Note that by using this decorator, you lose the ability to access the
return value of the wrapped function.
"""
react(lambda: f(*args))
 
def wait_for(*signals, **kwargs):
"""Block the current greenlet on the given signals.
 
This function is used to interrupt the current greenlet execution until one
of the specified signals is emitted. When that happens, execution is
resumed immediately after the function call, which returns
 
It is possible to specify a list of extra signals that result in
exceptions, by adding a named parameter 'exceptions' containing a list of
tuples of the form
 
(signal_name, exception_class).
 
If a signal contained in that list is emitted first, the corresponding
exception class is instantiated using the signal arguments as parameters,
and raised in the greenlet of the caller after it resumes.
"""
w = waiting_for(*signals, **kwargs)
with w:
pass
return normalize_result(w.result)
 
class waiting_for(object):
"""A variation of wait_for to be used in a with statement.
 
Sometimes, we want to make sure we are listening to a certain signal before
performing an operation, but wait_for doesn't allow the possibily of
executing code between when the signal is connected, and when it is
emitted.
 
This limitation is addressed by using waiting_for inside a with statement.
 
For example:
 
with waiting_for(message.sent):
send(message)
print "message sent"
 
would set up a handler for the message.sent signal, then execute the code
inside the with statement (i.e. send the message), and finally resume
execution of the parent greenlet (e.g. the application main loop).
 
Eventually, When the signal is emitted, the print statement is executed.
 
As with wait_for, optional 'exceptional' signals can be specified, that
will result in a raised exception when emitted.
"""
def __init__(self, *signals, **kwargs):
self.signals = signals
self.exceptions = kwargs.get("exceptions", [])
self.result = None
self.exception = None
 
def __enter__(self):
self.cc = gself()
for signal in self.signals:
signal.connect(self.handler)
for signal, exc in self.exceptions:
signal.connect(self.exception_handler)
return self
 
def __exit__(self, type, value, traceback):
self.cc.parent.switch()
if not self.exception is None:
raise self.exception
 
def disconnect_all(self):
for signal in self.signals:
signal.disconnect(self.handler)
for signal, _ in self.exceptions:
signal.disconnect(self.exception_handler)
 
def handler(self, *args):
self.disconnect_all()
self.result = normalize_result(args)
self.cc.parent = gself()
self.cc.switch()
def exception_handler(self, *args):
self.disconnect_all()
self.exception = exc(*args)
 
def events(signal):
"""Keep handling a signal indefinitely.
 
A common signal handling scenario is the need to perform some operation
every time the signal is emitted.
 
This is achieved normally without the use of reactor framework by simply
connecting the signal to a handler performing the desired operation.
 
However, there is a simple way to achieve the same result within the
reactor framework by iterating through the generator returned by the events
function as though they were all already available. The reactor framework
takes care of returning control to the main loop after each iteration, and
resuming the loop whenever a new message is available.
There is a number of of reasons why this approach might be preferable to
the direct one:
 
- consistency
- read/write access to the local frame
- ability to yield from within the handler code
- ability to manipulate the yield generator via combinators or comprehension
 
For example, the following code:
 
def incoming_messages(self):
return (e.message for e in events(self.message_received))
 
will return a generator containing all future incoming messages.
"""
event_args = []
current = gself()
 
def handler(*args):
event_args[:] = args
current.parent = gself()
current.switch()
 
signal.connect(handler)
while True:
current.parent.switch()
yield normalize_result(event_args)
signal.disconnect(handler)
 
class reactive_handler(object):
"""Adaptor to be used as a callback to non-reactive asynchronous function calls.
 
Many frameworks and libraries expose functions that take one or more
callbacks as arguments, and use them to notify the caller when the
operation is finished and what the result is.
 
Using a reactive_handler instance inside a with statement allows such a
function to be called within the reactor framework. The function will
execute and return to the main loop, and execution will resume after the
call only when the handler is actually invoked.
 
The handler may be used multiple times within the with statement, but can
only be called from a different greenlet.
"""
 
def __init__(self, exception=None):
self.exception = exception
self.result = None
self.base = self
 
def __enter__(self):
self.cc = gself()
return self
 
def __exit__(self, type, value, traceback):
if type is None:
if self.cc.parent is None:
raise Exception("reactive_handler cannot be used from the root greenlet")
self.cc.parent.switch()
else:
return
 
if self.exception: raise self.exception(*self.result)
return False
 
def __call__(self, *args):
self.base.result = normalize_result(args)
self.base.exception = self.exception
if gself() == self.base.cc:
raise Exception("reactive_handler invoked in the same greenlet where it was created")
self.base.cc.parent = gself()
self.base.cc.switch()
 
"""Create a special handler which will result in a raised exception when invoked.
 
When a function takes a handler to be invoked in case of error, the handler
returned by this function can be used. The specified exception class will
be instantiated by the handler and raised after the greenlet is resumed.
"""
def for_error(self, exception=None):
result = self.__class__(exception)
result.base = self
return result
 
class SignalAdaptor(object):
"""Adaptor for dbus-like signals.
 
The signals passed to the reactor framework are assumed to have an interface compatible with that of Qt signals in PySide.
 
This adaptor class allows the use of dbus signals in the reactor framework.
"""
def __init__(self, obj, name):
self.obj = obj
self.name = name
self.connection = None
 
def connect(self, handler):
if self.connection:
raise Exception("SignalAdaptor does not support more than 1 connection")
self.connection = self.obj.connect_to_signal(self.name, handler)
 
def disconnect(self, handler):
if self.connection:
self.connection.remove()
 
def normalize_result(result):
if len(result) == 1:
return result[0]
elif len(result) == 0:
return None
else:
return result

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.