Skip to content

Instantly share code, notes, and snippets.

@obowersa
Last active December 6, 2018 14:55
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save obowersa/5888b0648a2d870b77eef3064686b501 to your computer and use it in GitHub Desktop.
Save obowersa/5888b0648a2d870b77eef3064686b501 to your computer and use it in GitHub Desktop.
signals_thing
from flask import Flask, current_app
#Import Namespace from blinker, use this as your custom namespace for signals. You can change this but most folk
#just use the provided one.
from blinker import Namespace
#Just some flask boiler plate
app = Flask(__name__)
app.secret_key = 'DND'
#Call the Namespace() method from blinker and assign it to our variable sample_signal. This should intialise the namespace
# for the signals and setup sample_signal to process signals
sample_signal = Namespace()
# Just a method definition which matches the prototype for the signal we'll create
def foobar_signal(app, message, **extra):
print(message)
#Uses the sample_signal we created above to establish the a named signal called signal_handler_example. This acts as our
#reference to the queue/bus/etc and is how stuff gets sent between events and the subscribers.
foobar = sample_signal.signal('signal_handler_example')
#The subscriber. This says that our foobar_signal method will be triggered by events from the flask app method, connected together
#via our signal_handler_example which is assigned to foobar
foobar.connect(foobar_signal, app)
#Generic flask stuff. With this
@app.route('/', methods=['POST', 'GET'])
def home():
#Here we're calling the signal we created above to explicitly tell it to send messages to the signal_handler_example signal and
#trigger foobar_signal
foobar.send(current_app._get_current_object(), message='Hi')
foobar.send(current_app._get_current_object(), message='Hi')
foobar.send(current_app._get_current_object(), message='Hi')
return 'toot'
if __name__ == '__main__':
app.run(debug=True, port=5002)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment