• Martin Thoma
  • Home
  • Categories
  • Tags
  • Archives
  • Support me

Celery

Contents

  • The Problem
  • Solution 1: Conversion Server
  • Solution 2: Message Queue
  • Celery
    • setup.py
    • docker-compose.yml
    • config.py
    • app.py
    • celery_worker.py
    • Dockerfile
    • Tasks
  • See also

Celery is a task queue. I was experimenting with it today and want to share some insights.

The Problem

Think of YouTube. Users can upload videos, but before they are shown on the platform they need to get processed. Converting the format, checking for illegal content. This takes a considerable amount of time, so you don't want to let the user stare at a loading screen while you create the backup. So simply having a synchronous call is not working, as those tasks can easily take more than one minute and thus the user might get a timeout. And for sure the user would not like it, if he didn't get any feedback. Also, it might create considerable work and thus you might not want to run it on the same machine.

Solution 1: Conversion Server

To solve the load and the waiting time problem of the file conversion, we can create a "conversion server". Another machine which takes care of file conversions. Then instead of letting the user wait, we just tell him "conversion is in progress". We make a call to the conversion server with our video id (vid) and wait for the response.

Then we need to somehow call that conversion server. There are several approaches to this:

Synchronous call: We could make a small REST API, but then we would again have the problem that the conversion can take arbitrary long.

Communication by a synchronous request with a conversion server
Communication by a synchronous request with a conversion server

Polling: We could make the web app poll, hence give the conversion server /videos/<id> and let it return {"status": 0.0}, {"status": 0.5}, ..., {"status": 1.0, "result_uri": "s3://bucket/42.mpg"}. The disadvantage of polling is that the web app needs to administrate it. For example, maybe you want to send the user an e-mail once the conversion is done. You want to send this e-mail ASAP. So you need to keep polling once the request was sent. During peak times the number of videos in the conversion queue might get longer and thus we might end up polling a lot of videos, putting unnecessary load on the (internal) API of the conversion server.

Communication by polling
Communication by polling

Callback: Instead of having to ask again and again if the conversion is done, we could give the conversion server a callback URL. Once it is done, it calls the callback back. Hence callback. It works like this:

Communication by a callback
Communication by a callback

Solution 2: Message Queue

All three variants of the first solution have one fundamental problem: They don't scale well. Assume suddenly you get from several dozend users to tens of thousands. You will have peak times during which you need to convert way more videos. So you would like to spin up new instances and basically be able to do double the work with double the amount of machines.

How would you do that, starting with a single conversion server?

You would probably create a conversion service:

A file conversion service based on callbacks
A file conversion service based on callbacks

Here the API and the load balancer are overhead. And it is not clear how the "load balancer" would actually work.

Just store the kind of work you want to do somewhere and let "workers" pick up the task. This means you need to pass messages between the web app which contains the upload form / the dashboard for the user and between the workers.

Instead of having the API and the load balancer, you can use a message broker system like RabbitMQ, Redis or Amazon SQS (code sample). The core of those message broker systems is that you can send and receive messages. Sometimes you send it to a topic, in other system it is called a queue. Consumers / clients can read / listen to those queues / topics.

Celery

Celery is a task queue. While not being to deep in the topic, I would say the main advantage of celery compared to usuing the message brokers directly is simplicity. It's easier to use than rabbitmq directly. Additionally, you get flexibility to change the message broker.

It comes with a monitoring tool called flower.

Structurally, my app looked like this:

.
├── celery_worker.py
├── docker-compose.yml
├── Dockerfile
└── web
    ├── Dockerfile
    ├── my_project
    │   ├── app.py
    │   ├── config.py
    │   ├── __init__.py
    │   ├── main (other flask Blueprints look similar)
    │   │   ├── __init__.py
    │   │   ├── routes.py
    │   │   ├── static
    │   │   └── templates
    │   ├── models.py
    │   ├── static
    │   ├── templates
    │   └── wsgi.py
    ├── Makefile
    ├── setup.cfg
    └── setup.py

And I ran it either with docker-compose up or via

celery -A celery_worker.py worker --loglevel=debug

In the following, I give some details about how I adjusted the listed files.

setup.py

Add the new requirement: celery==4.3.0

docker-compose.yml

I've added the following lines:

version: "3"
services:
  broker:
    image: "rabbitmq:3-management"
    container_name: broker
    ports:
      - "5672:5672"
      - "15672:15672"
    volumes:
      - 'rabbitmq_data:/data'
  flower:
    image: mher/flower
    container_name: flower
    command: ["flower", "--broker=amqp://guest:guest@broker:5672//", "--port=8888"]
    ports:
       - 8888:8888
  worker:
    container_name: worker
    build: .
    depends_on:
      - broker
    links:
      - db
    environment:
      CELERY_HOST: broker
      MYSQL_PORT: 3306
      MYSQL_ROOT_PASSWORD: foo
      MYSQL_DATABASE: bar
      MYSQL_HOST: db
volumes:
  rabbitmq_data:

Of course, there is also the "web" service which I cropped out here for keeping it simple.

config.py

I've added the following:

CELERY_HOST = os.environ.get("CELERY_HOST", "localhost")
CELERY_BROKER_URL = "amqp://guest:guest@{host}:5672//".format(host=CELERY_HOST)
CELERY_BACKEND = "sqlalchemy"
CELERY_RESULT_DBURI = "sqla+" + SQLALCHEMY_DATABASE_URI

app.py

I followed the application factory pattern (create_app). On the global level of app.py (not hidden in a function), I've added:

from celery import Celery

celery = Celery(__name__, broker=config.CELERY_BROKER_URL)

and within the create_app:

def create_app(config=None):
    # some stuff ...
    app = Flask(__name__, template_folder="templates", static_folder="static")
    # some stuff ...
    celery.conf.update(app.config)

celery_worker.py

#!/usr/bin/env python
from my_project.app import celery, create_app

app = create_app()
app.app_context().push()

Dockerfile

FROM python:3.7-slim
LABEL maintainer="[email protected]"
LABEL description="This is for executing a celery worker"

# Copy projects code
COPY web/ /opt/app
COPY celery_worker.py /opt/app
WORKDIR /opt/app
RUN pip install -e . --no-cache-dir

# Start worker
ENTRYPOINT ["sh", "-c"]
CMD ["celery -A celery_worker.celery worker --loglevel=debug"]

Tasks

In one of the routes.py:

from my_project.app import celery


@backup.route("/videos/<vid>/convert")
@login_required
def convert(vid):
    """Convert a video. This can take time"""
    flash("Started converting video", "info")
    convert_video.delay(vid)  # returns almost instantly


@celery.task(name="convert_video")
def convert_video(user_id):
    # Executing this is what takes long
    # ... put the file conversion in here!
    return 42

See also

  • Flask: Celery Background Tasks
  • The Flask Mega-Tutorial Part XXII: Background Jobs by Miguel Grinberg
  • Why use Celery instead of RabbitMQ?
  • Is there any reason to use RabbitMQ over Kafka?

Published

Jun 10, 2019
by Martin Thoma

Category

Code

Tags

  • Architecture 5
  • inter-process communication 1
  • Message Queue 1

Contact

  • Martin Thoma - A blog about Code, the Web and Cyberculture
  • E-mail subscription
  • RSS-Feed
  • Privacy/Datenschutzerklärung
  • Impressum
  • Powered by Pelican. Theme: Elegant by Talha Mansoor