Last active
June 1, 2016 23:16
-
-
Save alukach/3861d63a81c41840056aeaceffe7649b to your computer and use it in GitHub Desktop.
An example of using Twisted to write asynchronous code while using blocking functions.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from twisted.internet import inotify, reactor, threads | |
from twisted.python import filepath | |
# Resources: | |
# Introduction to Deferreds: | |
# http://twisted.readthedocs.io/en/twisted-16.1.1/core/howto/defer-intro.html | |
# Returning Deferreds from synchronous functions: | |
# https://twistedmatrix.com/documents/current/core/howto/gendefer.html#returning-deferreds-from-synchronous-functions | |
# addCallbacks Docs: | |
# https://twistedmatrix.com/documents/16.1.1/api/twisted.internet.defer.Deferred.html#addCallbacks | |
# filepath Docs: | |
# http://twistedmatrix.com/documents/current/api/twisted.python.filepath.FilePath.html | |
def some_python_operation(filename): | |
""" | |
This represent a long running blocking function. Imagine that this | |
is calling a function in Django's ORM. | |
""" | |
import string | |
import time | |
sleep_length = string.lowercase.index(filename[-1]) + 1 | |
if sleep_length > 5: | |
raise Exception("Way too long for sleeping: %s" % filename) | |
time.sleep(sleep_length) | |
return sleep_length | |
def file_delete(filepath): | |
""" | |
Some slow deletion that should be run in a separate thread. | |
""" | |
import time | |
time.sleep(1) | |
filepath.remove() | |
def callback(results, filepath): | |
""" Report success and schedule asynchronous deletion of filepath """ | |
print("Slept for %s seconds, now deleting %s" % (results, filepath)) | |
return threads.deferToThread(file_delete, filepath) | |
def errback(failure, filepath): | |
""" Report failure and schedule asynchronous deletion of filepath """ | |
print("Failure", failure.getErrorMessage()) | |
# An example of rethrowing a failure whilst returning deferred, | |
# however it may be simpler to handle/log the error such as above | |
def rethrow(result_of_deleting): | |
return failure # inside callbacks, returning a Failure is equivalent to throwing an exception | |
return threads.deferToThread(file_delete, filepath).addCallback(rethrow) | |
def verify_deletion(results, filepath): | |
print("File %s was deleted: %s" % (filepath.basename(), not filepath.exists())) | |
def new_file_handler(_, filepath, mask): | |
""" | |
For historical reasons, an opaque handle is passed as first | |
parameter. This object should never be used. | |
@param filepath: FilePath on which the event happened. | |
@param mask: inotify event as hexadecimal masks | |
""" | |
# Handle only 'modify' and 'create events' | |
event = inotify.humanReadableMask(mask)[0] | |
if event not in ('modify', 'create'): | |
return | |
# We'll create a chain of callbacks | |
print("Scheduling callback for %s" % filepath.path) | |
d = threads.deferToThread(some_python_operation, filepath.basename()) | |
d.addCallbacks(callback, errback, callbackArgs=(filepath,), errbackArgs=(filepath,)) # Handle success and failure of deferred individually | |
d.addBoth(verify_deletion, filepath) # Handle either success or failure of above callbacks' deferreds uniformly | |
notifier = inotify.INotify() | |
notifier.startReading() | |
for d in ['/tmp/foo', '/tmp/bar']: | |
notifier.watch(filepath.FilePath(d), callbacks=[new_file_handler]) | |
print("Starting watcher...") | |
reactor.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment