Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Touch reload event listner for Supervisor
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
from __future__ import (print_function, unicode_literals,
absolute_import, generators, division)
# The MIT License (MIT)
# Copyright (c) 2016 ymt2 (Tatsuya Yamamoto)
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the Software
# is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
# INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
# PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE
# FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT
# OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE
# OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# A supervisor config snippet that tells supervisor to use this script
# as a listener is below.
#
# [eventlistener:touch_reload]
# command=python touch_reload_event_listener.py -p proc -f /home/user/reload.trigger
# events=PROCESS_STATE
doc = """\
python2 {} [-p processname] [-f trigger_file_name]
"""
doc = doc.format(__file__)
import os
import sys
import time
from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer
from supervisor import childutils
from superlance.compat import xmlrpclib
class TouchReload():
def __init__(self, process, trigger_file_path, rpc=None):
self.process = process
self.trigger_file_path = trigger_file_path
self.rpc = rpc
def runforever(self):
monitor_path = os.path.abspath(os.path.dirname(self.trigger_file_path))
event_handler = ModifyHandler(self.process,
self.trigger_file_path,
self.restart)
observer = Observer()
observer.schedule(event_handler, monitor_path, recursive=False)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
def restart(self, name):
try:
self.rpc.supervisor.stopProcess(name)
except xmlrpclib.Fault as e:
self.stderr.write(e)
raise
try:
self.rpc.supervisor.startProcess(name)
except xmlrpclib.Fault as e:
self.stderr.write(e)
raise
class ModifyHandler(FileSystemEventHandler):
def __init__(self, process, trigger_file_path, callback):
self.process = process
self.trigger_file_path = trigger_file_path
self.callback = callback
def on_modified(self, event):
if event.is_directory:
return
if event.src_path == self.trigger_file_path:
self.callback(self.process)
def usage():
print(doc)
sys.exit(255)
def touch_reload_from_args(arguments):
import getopt
short_args = "p:f:"
if not arguments:
return None
try:
opts, args = getopt.getopt(arguments, short_args)
except:
return None
process = None
trigger_file = None
for option, value in opts:
if option in ('-p'):
process = value
if option in ('-f'):
trigger_file = value
if process is None or trigger_file is None:
return None
touch_reload = TouchReload(process, trigger_file)
return touch_reload
def main():
touch_reload = touch_reload_from_args(sys.argv[1:])
if touch_reload is None:
usage()
touch_reload.rpc = childutils.getRPCInterface(os.environ)
touch_reload.runforever()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment