Skip to content

Instantly share code, notes, and snippets.

@amvtek
Last active March 28, 2018 05:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save amvtek/0aa479fbfb6f83621b40 to your computer and use it in GitHub Desktop.
Save amvtek/0aa479fbfb6f83621b40 to your computer and use it in GitHub Desktop.
Making use of twisted coiterate...
# -*- coding: utf-8 -*-
"""
Making use of twisted coiterate
example 01
"""
from twisted.internet import reactor
from twisted.internet.task import coiterate
def make_iterator_to_sum_all_integers_until(N):
s = 0
for i in xrange(N):
s += i
print "Adding %i to result" % i
yield None # event loop can looks after other things...
print "result is %i " % s
def sum_all_integers_being_nice_to_reactor(N):
all_sum_steps = make_iterator_to_sum_all_integers_until(N)
coiterate(all_sum_steps)
reactor.callLater(0, sum_all_integers_being_nice_to_reactor, 8)
reactor.run()
# -*- coding: utf-8 -*-
"""
Making use of twisted coiterate
example 02 : Obtaining coiteration result...
"""
from twisted.internet import reactor
from twisted.internet.task import coiterate
def make_iterator_to_sum_all_integers_until(N, context):
s = 0
for i in xrange(N):
s += i
print "Adding %i to result" % i
yield None # event loop can looks after other things...
context['result'] = s
def sum_all_integers_being_nice_to_reactor(N):
"return Deferred firing calculated sum..."
def extract_result_cb(ignored, context):
"return context['result']"
rv = context['result']
print "Got result = %s" % rv
return rv
context = {}
all_sum_steps = make_iterator_to_sum_all_integers_until(N, context)
deferred = coiterate(all_sum_steps)
deferred.addCallback(extract_result_cb, context)
return deferred
reactor.callLater(0, sum_all_integers_being_nice_to_reactor, 8)
reactor.run()
# -*- coding: utf-8 -*-
"""
Making use of twisted coiterate
example 03 : Waiting for Deferred...
"""
from twisted.internet import reactor
from twisted.internet.task import coiterate, deferLater
def make_iterator_to_sum_all_integers_until(N, context):
def wait_some_time(t):
"return Deferred firing after t seconds"
return deferLater(reactor,t,lambda :"I was paused %.02f seconds"%t)
def print_pause_cb(msg):
"callback printing result message..."
print msg
s = 0
for i in xrange(N):
s += i
print "Adding %i to result" % i
d = wait_some_time(1.0)
d.addCallback(print_pause_cb)
yield d # we will be paused until d fires...
context['result'] = s
def sum_all_integers_being_nice_to_reactor(N):
"return Deferred firing calculated sum..."
def extract_result_cb(ignored, context):
"return context['result']"
rv = context['result']
print "Got result = %s" % rv
return rv
context = {}
all_sum_steps = make_iterator_to_sum_all_integers_until(N, context)
deferred = coiterate(all_sum_steps)
deferred.addCallback(extract_result_cb, context)
return deferred
reactor.callLater(0, sum_all_integers_being_nice_to_reactor, 8)
reactor.run()
# -*- coding: utf-8 -*-
"""
Making use of twisted coiterate
example 04 : Cancelling coiteration...
"""
from twisted.internet import reactor
from twisted.internet.defer import Deferred, CancelledError
from twisted.internet.task import coiterate, deferLater
def make_iterator_to_sum_all_integers_until(N, context):
def wait_some_time(t):
"return Deferred firing after t seconds"
return deferLater(reactor,t,lambda :"I was paused %.02f seconds"%t)
def print_pause_cb(msg):
"callback printing result message..."
print msg
d = None
s = 0
try:
for i in xrange(N):
s += i
print "Adding %i to result" % i
d = wait_some_time(1.0)
d.addCallback(print_pause_cb)
yield d # we will be paused until d fires...
context['result'] = s
except GeneratorExit:
print "---"
print "Early termination..."
# cancel pending Defferred
if d and not d.called:
d.cancel()
def sum_all_integers_being_nice_to_reactor(N):
"return Deferred firing calculated sum..."
def extract_result_cb(ignored, context):
"return context['result']"
rv = context['result']
return rv
def suppress_cancel_log_eb(error):
"trap CancelledError"
# this suppress UnhandledError warning...
error.trap(CancelledError)
context = {}
all_sum_steps = make_iterator_to_sum_all_integers_until(N, context)
deferred = Deferred(lambda _:all_sum_steps.close())
coiterate(all_sum_steps).chainDeferred(deferred)
deferred.addCallback(extract_result_cb, context)
deferred.addErrback(suppress_cancel_log_eb)
return deferred
def main():
"start summing integers and stop after 3 seconds..."
def print_result_cb(res):
"print result if any..."
if res is not None:
print "Got result = %s" % res
# start sum calculation using coiteration...
d = sum_all_integers_being_nice_to_reactor(8)
d.addCallback(print_result_cb)
# schedule cancellation after 3.00 seconds
reactor.callLater(3.0, d.cancel)
reactor.callLater(0, main)
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment