Create a gist now

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A python script which starts celery worker and auto reload it when any code change happens.
'''
A python script which starts celery worker and auto reload it when any code change happens.
I did this because Celery worker's "--autoreload" option seems not working for a lot of people.
'''
import time
from watchdog.observers import Observer ##pip install watchdog
from watchdog.events import PatternMatchingEventHandler
import psutil ##pip install psutil
import os
import subprocess
code_dir_to_monitor = "/path/to/your/code/dir"
celery_working_dir = code_dir_to_monitor #happen to be the same. It may be different on your machine
celery_cmdline = 'celery worker -A some_project -l INFO'.split(" ")
class MyHandler(PatternMatchingEventHandler):
def on_any_event(self, event):
print("detected change. event = {}".format(event))
for proc in psutil.process_iter():
proc_cmdline = self._get_proc_cmdline(proc)
if not proc_cmdline or len(proc_cmdline) < len(celery_cmdline):
continue
is_celery_worker = 'python' in proc_cmdline[0].lower() \
and celery_cmdline[0] == proc_cmdline[1] \
and celery_cmdline[1] == proc_cmdline[2]
if not is_celery_worker:
continue
proc.kill()
print("Just killed {} on working dir {}".format(proc_cmdline, proc.cwd()))
run_worker()
def _get_proc_cmdline(self, proc):
try:
return proc.cmdline()
except Exception as e:
return []
def run_worker():
print("Ready to call {} ".format(celery_cmdline))
os.chdir(celery_working_dir)
subprocess.Popen(celery_cmdline)
print("Done callling {} ".format(celery_cmdline))
if __name__ == "__main__":
run_worker()
event_handler = MyHandler(patterns = ["*.py"])
observer = Observer()
observer.schedule(event_handler, code_dir_to_monitor, recursive=True)
observer.start()
print("file change observer started")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
@LutzSteinborn

This comment has been minimized.

Show comment
Hide comment
@LutzSteinborn

LutzSteinborn Jun 5, 2016

This made my day, Thanks Kent!

LutzSteinborn commented Jun 5, 2016

This made my day, Thanks Kent!

@elgartoinf

This comment has been minimized.

Show comment
Hide comment
@elgartoinf

elgartoinf Sep 29, 2017

that amazing super script

elgartoinf commented Sep 29, 2017

that amazing super script

@foarsitter

This comment has been minimized.

Show comment
Hide comment
@foarsitter

foarsitter Oct 4, 2017

Even you created this script two years ago, it still works like a charm! Thanks!

I adjusted line 28 to 'in' instead of '==' because i run it in a docker.

Cheers!

foarsitter commented Oct 4, 2017

Even you created this script two years ago, it still works like a charm! Thanks!

I adjusted line 28 to 'in' instead of '==' because i run it in a docker.

Cheers!

@francis2001tw

This comment has been minimized.

Show comment
Hide comment
@francis2001tw

francis2001tw Jun 9, 2018

Bravo !!! Thanks for sharing

francis2001tw commented Jun 9, 2018

Bravo !!! Thanks for sharing

@Heiner92

This comment has been minimized.

Show comment
Hide comment
@Heiner92

Heiner92 Jun 18, 2018

You are a genius! Thank you so so much! I was about to go crazy manually restarting my celery workers on windows everytime I changed my tasks.

I had to change the on_any_event function because I'm using winpython (need to open a seperate shell):
Had to start the new worker before killing the old process

    def on_any_event(self, event):
        print("detected change. event = {}".format(event))
    
        for proc in psutil.process_iter():
            proc_cmdline = self._get_proc_cmdline(proc)
            if not proc_cmdline or len(proc_cmdline) < len(celery_cmdline):
                continue
    
            is_celery_worker = 'python' in proc_cmdline[0].lower() \
                               and celery_cmdline[0] in proc_cmdline[1] \
                               and celery_cmdline[1] == proc_cmdline[2]
            
            if not is_celery_worker:
                continue
            try:
                run_worker()
                print("Just killed {} on working dir {}".format(proc_cmdline, proc.cwd()))
                proc.kill()
                break
            except Exception as e:
                print(str(e))

Heiner92 commented Jun 18, 2018

You are a genius! Thank you so so much! I was about to go crazy manually restarting my celery workers on windows everytime I changed my tasks.

I had to change the on_any_event function because I'm using winpython (need to open a seperate shell):
Had to start the new worker before killing the old process

    def on_any_event(self, event):
        print("detected change. event = {}".format(event))
    
        for proc in psutil.process_iter():
            proc_cmdline = self._get_proc_cmdline(proc)
            if not proc_cmdline or len(proc_cmdline) < len(celery_cmdline):
                continue
    
            is_celery_worker = 'python' in proc_cmdline[0].lower() \
                               and celery_cmdline[0] in proc_cmdline[1] \
                               and celery_cmdline[1] == proc_cmdline[2]
            
            if not is_celery_worker:
                continue
            try:
                run_worker()
                print("Just killed {} on working dir {}".format(proc_cmdline, proc.cwd()))
                proc.kill()
                break
            except Exception as e:
                print(str(e))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment