Skip to main content

celery

·841 words·4 mins·
Python Orchestration
Table of Contents

In this article we will go through Celery and run a sample monitory script which call a long_running task managed by Celery.

Intro
#

What is a Task Queue
#

Task queues are used as a mechanism to distribute work across threads or machines.

A task queue’s input is a unit of work called a task. Dedicated worker processes constantly monitor task queues for new work to perform.

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker.

A Celery system can consist of multiple workers and brokers, giving way to high availability and horizontal scaling.

Examples
#

Examples of when Celery is useful:

  • Sending emails in the background to avoid blocking user requests.
  • Generating large reports or summaries.
  • Processing images or other media files uploaded by users.
  • Running machine learning tasks on user data.
  • Performing bulk operations, such as data import or export.
  • Scheduling tasks for routine maintenance, data cleanup, or other scheduled operations.
  • Integrating with third-party APIs without blocking the main application.

Supports
#

Brokers
#

  • RabbitMQ
  • Redis
  • Amazon SQS

Concurrency
#

  • prefork (multiprocessing)
  • Eventlet, gevent
  • thread (multithreaded)
  • solo (single threaded)

Result Stores
#

  • AMQP, Redis
  • Memcached,
  • SQLAlchemy, Django ORM
  • Apache Cassandra, Elasticsearch, Riak
  • MongoDB, CouchDB, Couchbase, ArangoDB
  • Amazon DynamoDB, Amazon S3
  • Microsoft Azure Block Blob, Microsoft Azure Cosmos DB
  • Google Cloud Storage
  • File system

Serialization
#

  • pickle, json, yaml, msgpack
  • zlib, bzip2 compression
  • Cryptographic message signing

Features
#

Monitoring
#

A stream of monitoring events is emitted by workers and is used by built-in and external tools to tell you what your cluster is doing – in real-time.

Scheduling
#

You can specify the time to run a task in seconds or a datetime, or you can use periodic tasks for recurring events based on a simple interval, or Crontab expressions supporting minute, hour, day of week, day of month, and month of year.

Work-flows
#

Simple and complex work-flows can be composed using a set of powerful primitives we call the “canvas”, including grouping, chaining, chunking, and more.

Resource Leak Protection
#

The --max-tasks-per-child option is used for user tasks leaking resources, like memory or file descriptors, that are simply out of your control.

Time & Rate Limits
#

You can control how many tasks can be executed per second/minute/hour, or how long a task can be allowed to run, and this can be set as a default, for a specific worker or individually for each task type.

User Components
#

Each worker component can be customized, and additional components can be defined by the user. The worker is built up using “bootsteps” — a dependency graph enabling fine grained control of the worker’s internals.

Install
#

To install using pip

uv init
uv venv
source .venv/bin/activate
uv pip install -U Celery

Testing
#

In this example scenario we will be using Redis as Broker and a monitor script to capture the execution progress.

We need a redis instance I’m using the following docker-redis for it

docker-compose up -d

Lets add rich and celery[redis] modules to our project

uv pip install -U Celery[redis]
uv pip install rich

Then we need to register the tasks which the worker will run by creating tasks.py whith the following code

from celery import Celery
import time

app = Celery('tasks', backend='redis://localhost:6379/1', broker='redis://localhost:6379/0')

@app.task(bind=True)
def long_running_task(self):
    for i in range(0, 101, 10):
        time.sleep(0.5)
        self.update_state(state='PROGRESS', meta={'progress': i})
    return {'progress': 100}

In this example we created a dummy long_running_task() which simply updates the execution status.

One can start the worker with the following

celery -A tasks worker --loglevel=INFO

Now lets create our monitor.py script which will trigger the celery task

import time
from rich.progress import Progress, BarColumn, TextColumn, TimeRemainingColumn
from tasks import long_running_task

def monitor_task(task_id):
    from celery.result import AsyncResult
    result = AsyncResult(task_id)

    with Progress(
        TextColumn("[progress.description]{task.description}"),
        BarColumn(),
        "[progress.percentage]{task.percentage:>3.0f}%",
        TimeRemainingColumn(),
    ) as progress:
        task_id_ui = progress.add_task("Running Task...", total=100)

        while not result.ready():
            meta = result.info or {}
            progress_value = meta.get('progress', 0)
            progress.update(task_id_ui, completed=progress_value)
            time.sleep(0.5)

        # Final update
        progress.update(task_id_ui, completed=100)

if __name__ == "__main__":
    # Start the Celery task
    task = long_running_task.delay()
    print(f"Started task: {task.id}")
    monitor_task(task.id)

You can execute the monitor script python monitor.py and you should get a similar output:

(test-celery) -bash-5.2$ python monitor.py 
Started task: 6c5b3656-a148-423c-acca-fd59c2ec23b8
Running Task... ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 100% 0:00:00
(test-celery) -bash-5.2$

Conclusion
#

In this article, we explored some of Celery’s key features and developed a monitoring script that executes a celery_long_running task while tracking its execution status. This example only scratched the surface of what Celery can do. As a distributed task queue system for Python, Celery excels at handling tasks asynchronously or on a schedule, helping the main application remain responsive and efficient.

We also examined several scenarios where Celery proves beneficial. In general, it’s well-suited for any long-running or periodic tasks that can be offloaded to improve user experience. Additionally, Celery provides powerful tools for execution tracking, task distribution, and retry management, making it a robust solution for asynchronous processing in Python applications.

References
#