Skip to content

Instantly share code, notes, and snippets.

@mpontillo
Created October 19, 2016 00:36
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mpontillo/18b3709f12843f4ec18f457ae1460c60 to your computer and use it in GitHub Desktop.
Save mpontillo/18b3709f12843f4ec18f457ae1460c60 to your computer and use it in GitHub Desktop.
# This is a startup script for IPython. It adds a "line magic" helper that
# makes it very easy to run a Twisted reactor in a separate thread.
#
# This script has been tested in Python 2 and Python 3.
#
# Author: Mike Pontillo <pontillo@gmail.com>
#
# INSTRUCTIONS
# ============
# Place this file in: ~/.ipython/profile_default/startup/reactor.py
#
# USAGE
# =====
# To get a running reactor, assign it to %reactor. When you exit IPython, the
# reactor will be stopped automatically. For example:
#
# In [1]: reactor = %reactor
# Starting reactor...
#
# In [2]: from twisted.names.client import getResolver
#
# In [3]: resolver = getResolver()
#
# In [4]: from pprint import pprint
#
# In [5]: resolver.lookupAddress("launchpad.net").addBoth(lambda data: pprint([rr.payload for rr in data[0]]))
# Out[5]: <Deferred at 0x7f190e0997f0 waiting on Deferred at 0x7f190e0b4278>
# [<A address=91.189.89.223 ttl=351>, <A address=91.189.89.222 ttl=351>]
#
# In [6]: ^D
# Do you really want to exit ([y]/n)? y
# Reactor stopped.
from IPython.core.magic import (
Magics,
magics_class,
line_magic,
)
@magics_class
class ReactorMagic(Magics):
def __init__(self, shell=None, *args, **kwargs):
super(ReactorMagic, self).__init__(shell=shell, *args, **kwargs)
self.reactor = None
# Note: I couldn't figure out how to both use atexit.register() here
# and allow the reactor to cleanly shut down, so we're using this
# deprecated hook.
shell.set_hook('shutdown_hook', self.shutdown, _warn_deprecated=False)
def shutdown(self, *args, **kwargs):
if self.reactor is not None:
self.reactor.callFromThread(self.reactor.stop)
@line_magic
def reactor(self, line):
if self.reactor is None:
import sys
import twisted.internet.reactor
self.reactor = twisted.internet.reactor
self.reactor.addSystemEventTrigger(
"after", "shutdown", sys.stderr.write, "Reactor stopped.\n")
self.reactor.addSystemEventTrigger(
"before", "startup", sys.stderr.write, "Starting reactor...\n")
from threading import Thread
t = Thread(target=self.reactor.run, args=(False,))
t.daemon = True
t.start()
return self.reactor
get_ipython().register_magics(ReactorMagic)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment