Skip to content

Instantly share code, notes, and snippets.

@ba1dr
Created February 29, 2016 20:38
Show Gist options
  • Save ba1dr/5524797cd84b19706d19 to your computer and use it in GitHub Desktop.
Save ba1dr/5524797cd84b19706d19 to your computer and use it in GitHub Desktop.
Extending Flower
from flower.app import Flower
from .urls import handlers
class MyFlower(Flower):
@classmethod
def get_handlers(cls):
orig_handlers = super(MyFlower, cls).get_handlers()
error_handler = orig_handlers.pop() # always the last
orig_handlers.extend(handlers)
orig_handlers.append(error_handler)
return orig_handlers
from flower.command import FlowerCommand
from .app import MyFlower
class MyFlowerCommand(FlowerCommand):
@classmethod
def get_flower_class(cls):
return MyFlower
# located in api/tasks.py
from tornado import web
from flower.api.tasks import BaseTaskHandler
class KnownTasks(BaseTaskHandler):
@web.authenticated
def get(self):
"""
Get a list of known tasks
"""
known_tasks = {}
for tname, task in self.capp.tasks.items():
if tname.startswith('celery.'):
continue
known_tasks[tname] = {
'name': tname,
}
# self.write({'task-names': sorted(self.capp.tasks.keys())})
self.write(known_tasks)
from .api import tasks
from . import views
handlers = [
(r"/api/tasks/known", tasks.KnownTasks),
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment