Skip to content

Instantly share code, notes, and snippets.

@lalit97
Last active June 11, 2022 10:01
Show Gist options
  • Save lalit97/3862e206f30be846eb3e57118f35f1c1 to your computer and use it in GitHub Desktop.
Save lalit97/3862e206f30be846eb3e57118f35f1c1 to your computer and use it in GitHub Desktop.

When to use Celery

  1. To do any network call in a request-response cycle

    Sending the email is a network call and might take 2-3 seconds. User should not be made to wait for these 2-3 seconds. So sending activation email should be done outside of request-response cycle. It can be achieved using celery.

  2. Breaking a large task consisting of several independent parts into smaller tasks

    If you write a single function to sequentially hit 5 endpoints provided by FB and if network calls take 2 seconds at an average, then your function will take 10 seconds to complete. So you can split your work in 5 individual tasks(it’s very easy to do as we will soon see), and let Celery handle the tasks. Celery can hit these 5 endpoints parallely and you can get the response from all the endpoints within first 2 seconds.

  3. perodic tasks

Why to use Celery

  1. We want web responses to be fast.

    So on user signup, server should send the response immediately and the actual job of sending the email should be sent to celery.

  2. We can use celery to make our scripts faster and to make better utilization of cpu.

    In the FB example I described earlier, we can go from 10 seconds to 2 seconds and also our cpu utilization would be higher if we use celery.

  3. We can use celery to make our tasks more manageable.

    In our FB example, if everything were in a single function being executed sequentially and if an error occurred during fetching the second url, then other 3 urls wouldn’t be hit.

Simple celery example

  1. without celery
import requests
import time

def func(urls):
	start = time.time()
	for url in urls:
		resp = requests.get(url)
		print(resp.status_code)
	print("It took", time.time() - start, "seconds")

func(["http://google.com", "https://amazon.in", "https://facebook.com", "https://twitter.com", "https://alexa.com"])
  1. with celery The main component of a celery enabled program or a celery setup is the celery worker. In our web app signup example, celery worker would do the job of sending the emails. Similary here, celery worker would do the job of fetching the urls.

    Celery worker and your application/script are different processes and run independent of each other. So your application/script and celery need some way to communicate with each other. That’s where a message queue comes into picture.

    Application code needs to put the task somewhere from where celery worker can fetch it and execute. Application code puts the task on a message queue. Celery worker fetches the task from message queue and exectues the task. We will use redis as the message queue.

import time
import requests
from celery import Celery

app = Celery("celery_blog", broker="redis://localhost:6379/0")


@app.task
def fetch_url(url):
    print("Task picked by worker from redis queue")
    resp = requests.get(url)
    print(resp.status_code)
    return "1 of the Task completed by worker"


def func(urls):
    start = time.time()
    for url in urls:
        print("Task sent to redis queue")
        fetch_url.delay(url)
    print("All tasks sent to redis queue")
    print(f"It took {time.time()-start} seconds")


if __name__ == "__main__":
    func(
        [
            "http://google.com",
            "https://amazon.in",
            "https://alexa.com",
            "https://wikipedia.org",
            "https://www.apple.com/",
            "https://abc.xyz/",
            "https://stackoverflow.com/",
            "https://youtube.com/",
            "https://jira.com/",
            "https://www.flipkart.com/",
        ]
    )


The first argument to Celery is the name of the current module, this is needed so that names can be automatically generated, the second argument is the broker keyword argument which specifies the URL of the message broker you want to use

Message queue and message broker are synonymous term for our basic discussion.

A celery worker can run multiple processes parallely. We want to hit all our urls parallely and not sequentially. So we need a function which can act on one url and we will run 5 of these functions parallely. So we wrote a celery task called fetch_url and this task can work with a single url.

A celery task is just a function with decorator “app.task” applied to it. A Celery worker, works on celery tasks.

From our old function, we called the task 5 times, each time passing a different url.

When we say “fetch_url.delay(url)”, the code is serialized and put in the message queue, which in our case is redis. Celery worker when running will read the serialized thing from queue, then deserialize it and then execute it.

Now the actual work of hitting the url isn’t being done by our script anymore, it will be done by celery. You can see by the print statements of the celery task, they are printing inside celery shell.

Retrying failed tasks

import twitter

@app.task(bind=True)
def fetch_tweets(self)
	auth = twitter.OAuth(oauth_token, oauth_token_secret, key, secret)
	client = twitter.Twitter(auth=auth)
	params = {'user_id': user_id, 'count': 2}
	try:
		response = client.favorites.list(**params)
	except twitter.TwitterError as e:
		if e.e.code == 429:
			# 429: rate limit exceeded
			# Ask Twitter to tell at what time rate limit window gets reset
			rate_limit_status = self.client.application.rate_limit_status(resources="statuses")
			reset = rate_limit_status["resources"]["statuses"]["/statuses/user_timeline"]["reset"]
			dt = datetime.datetime.utcfromtimestamp(reset)
			raise self.retry(eta=dt, exc=e, max_retries=1)
	return response

Identifying time consuming tasks

The rate of dequeue by the workers must keep up with the rate of enqueue. Else tasks will start piling up in the queue. If entries start accumulating, very soon the queue capacity would be breached and queue system would collapse.

We need to pinpoint the time consuming tasks which keep the workers occupied for long time — since they are essentially the cause of accumulation of tasks in the queue.

Going through the task logic and trying to infer the slow tasks might work when you have 2 tasks or few tasks. However, with even a slightly complex application, there are chances that the application has more than 50 tasks. Manually finding the slow tasks isn't an option then.

The solution lies in logging the time taken for execution of each task. And then ordering the tasks by time taken.

Analysing these logs isn't straightforward because log entries are text entries. They have to be parsed into a structured format to be able to sort by Task-time. That's where ELK comes into picture. We can setup an ELK pipeline. Logstash can parse the logs and ingest into Elasticsearch where it can be sorted by Task-time and time consuming tasks can be identified.

Distributing tasks on different queues andworkers

It's important to have clarity around whether tasks are network bound or cpu bound. They should be properly categorized and kept on corresponding queues.

If you have several network bound tasks, it makes sense to keep them on the same queue so they are logically grouped together. And you should have multiple workers to consume from this queue.

If you have several CPU bound tasks, it makes sense to use the same queue for them. And depending on their CPU need, probably keep one worker per core for cpu bound tasks.

Keep a separate queue and worker for highest priority tasks so irrespective of which tasks are being executed, these highest priority task get some CPU share instead of being blocked in the queue because of other time consuming tasks.

Periodic tasks with celery

@shared_task
def get_app_category():
    for app in App.objects.all():
        try:
            app_data = play_scraper.details(app.url)
            app.category = app_data.get("category")[0]
        except Exception as e:
            # add logger here
            pass
        app.save()
    return "app categories updated"

inside settings

CELERY_BEAT_SCHEDULE = {
    "get_app_category": {
        "task": "tasks.tasks.get_app_category",
        "schedule": crontab(hour="0"),
    }
}

Slides

https://s3-eu-west-1.amazonaws.com/pyconil-data-amit/presentations/Itamar_Hartstein_Celery+-+PyCon+Presentation.pdf

Resources

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment