Skip to content

Instantly share code, notes, and snippets.

@aliles
Created January 16, 2014 12:31
Show Gist options
  • Save aliles/8454244 to your computer and use it in GitHub Desktop.
Save aliles/8454244 to your computer and use it in GitHub Desktop.
Twisted based cooperative processing of mixed iterators using deferreds. Mixed iterators produce either values, or deferreds that will produce values.
"""Cooperative, deferred based consumption and processing of mixed iterators.
Iterators may return values, or deferreds that will eventually return values.
After consuming a value from the iterator, control is voluntarily yielded to
other iterators waiting to be processed.
"""
from twisted.internet import defer, reactor, task
def iterate(iterator):
"""Consume next value from iterator
If the next value is not a deferred, process the value with process() and
return a new deferred for the next value.
If the next value is a deferred add process() as a callback followed by an
anonymous function for a new deferred to process the next value.
"""
try:
value = iterator.next()
new_deferred = task.deferLater(reactor, 0, iterate, iterator)
if not isinstance(value, defer.Deferred):
process(value)
return new_deferred
else:
value.addCallback(process)
value.addCallback(lambda _: new_deferred)
return value
except StopIteration:
return
def process(result):
"Process a return value for iterator or deferred"
print result
def create(start, stop):
"Create new iterator that returns both values and deferreds"
iterator = [defer.succeed("%d-%d" % (start, stop))] + range(start, stop)
return task.deferLater(reactor, 0, iterate, iter(iterator))
if __name__ == '__main__':
deferreds = defer.DeferredList([create(0, 3), create(100, 103)])
deferreds.addCallback(lambda result: reactor.stop())
reactor.run()
@aliles
Copy link
Author

aliles commented Jan 16, 2014

Questions I have about best practice with Twisted:

  1. Is task.deferLater(reactor, 0, ...) the best way to yield control to other waiting deferreds?
  2. Should iterate() be a broken into separate functions for processing, scheduling and halting?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment