Skip to content

Instantly share code, notes, and snippets.

@alukach
Last active June 1, 2016 23:16
Show Gist options
  • Save alukach/3861d63a81c41840056aeaceffe7649b to your computer and use it in GitHub Desktop.
Save alukach/3861d63a81c41840056aeaceffe7649b to your computer and use it in GitHub Desktop.
An example of using Twisted to write asynchronous code while using blocking functions.
from twisted.internet import inotify, reactor, threads
from twisted.python import filepath
# Resources:
# Introduction to Deferreds:
# http://twisted.readthedocs.io/en/twisted-16.1.1/core/howto/defer-intro.html
# Returning Deferreds from synchronous functions:
# https://twistedmatrix.com/documents/current/core/howto/gendefer.html#returning-deferreds-from-synchronous-functions
# addCallbacks Docs:
# https://twistedmatrix.com/documents/16.1.1/api/twisted.internet.defer.Deferred.html#addCallbacks
# filepath Docs:
# http://twistedmatrix.com/documents/current/api/twisted.python.filepath.FilePath.html
def some_python_operation(filename):
"""
This represent a long running blocking function. Imagine that this
is calling a function in Django's ORM.
"""
import string
import time
sleep_length = string.lowercase.index(filename[-1]) + 1
if sleep_length > 5:
raise Exception("Way too long for sleeping: %s" % filename)
time.sleep(sleep_length)
return sleep_length
def file_delete(filepath):
"""
Some slow deletion that should be run in a separate thread.
"""
import time
time.sleep(1)
filepath.remove()
def callback(results, filepath):
""" Report success and schedule asynchronous deletion of filepath """
print("Slept for %s seconds, now deleting %s" % (results, filepath))
return threads.deferToThread(file_delete, filepath)
def errback(failure, filepath):
""" Report failure and schedule asynchronous deletion of filepath """
print("Failure", failure.getErrorMessage())
# An example of rethrowing a failure whilst returning deferred,
# however it may be simpler to handle/log the error such as above
def rethrow(result_of_deleting):
return failure # inside callbacks, returning a Failure is equivalent to throwing an exception
return threads.deferToThread(file_delete, filepath).addCallback(rethrow)
def verify_deletion(results, filepath):
print("File %s was deleted: %s" % (filepath.basename(), not filepath.exists()))
def new_file_handler(_, filepath, mask):
"""
For historical reasons, an opaque handle is passed as first
parameter. This object should never be used.
@param filepath: FilePath on which the event happened.
@param mask: inotify event as hexadecimal masks
"""
# Handle only 'modify' and 'create events'
event = inotify.humanReadableMask(mask)[0]
if event not in ('modify', 'create'):
return
# We'll create a chain of callbacks
print("Scheduling callback for %s" % filepath.path)
d = threads.deferToThread(some_python_operation, filepath.basename())
d.addCallbacks(callback, errback, callbackArgs=(filepath,), errbackArgs=(filepath,)) # Handle success and failure of deferred individually
d.addBoth(verify_deletion, filepath) # Handle either success or failure of above callbacks' deferreds uniformly
notifier = inotify.INotify()
notifier.startReading()
for d in ['/tmp/foo', '/tmp/bar']:
notifier.watch(filepath.FilePath(d), callbacks=[new_file_handler])
print("Starting watcher...")
reactor.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment