Skip to content

Instantly share code, notes, and snippets.

@crooksey
Created April 5, 2018 10:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save crooksey/2faebf0570f4cd3e5224cc2a26964259 to your computer and use it in GitHub Desktop.
Save crooksey/2faebf0570f4cd3e5224cc2a26964259 to your computer and use it in GitHub Desktop.
from celery import Celery
from celery.schedules import crontab
from pyramid_mailer.message import Message
from intranet_backend_api.celery_bundle.celery_settings import mailer
# Define app
app = Celery()
## Broker settings.
broker_url = 'amqp://app_rmq:pass@localhost:5672/intranet'
# Set timezone, for DST
timezone = 'Europe/London'
# Modules to import when the Celery worker starts.
imports = ('intranet_backend_api.celery_bundle.tasks.time_attendance')
## Using the database to store task state and results.
#result_backend = 'db+sqlite:///results.db'
def my_on_failure(self, exc, task_id, args, kwargs, einfo):
message_formatted = 'Oh no! Task failed: {0!r}'.format(exc)
message = Message(
subject="Failed celery task",
sender='admin@example.com',
recipients=['luke@example.com'],
body=message_formatted)
mailer.send_immediately(message)
task_annotations = {'*': {'on_failure': my_on_failure}}
app.conf.beat_schedule = {
'users-not-clocked-in': {
'task': 'intranet_backend_api.celery_bundle.tasks.time_attendance.not_clocked_in',
'schedule': crontab(hour=11, minute=40, day_of_week="mon-fri")
},
}
@app.task
def users_not_clocked_in():
message = Message(
subject="Users not clocked in",
sender='admin@example.com',
recipients=['luke@example.com'],
html="something",
body=None)
mailer.send_immediately(message)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment