python celery 介绍

   2025-03-22 20:02:07  21  6  0

Celery 是一个非常流行的分布式任务队列,用于处理异步任务,通常与消息中间件(如 RabbitMQ 或 Redis)一起使用。它使得 Python 应用程序能够执行任务在后台异步执行,而不阻塞主程序的执行。

Celery 的核心概念

  1. 任务(Task): Celery 的核心单位是任务,可以是任何可调用的对象。你通过装饰器 @app.task 来定义任务。

  2. 队列(Queue): 任务会被放入消息队列(如 Redis 或 RabbitMQ)。任务可以从队列中被取出并执行。

  3. Worker(工作者): Worker 是处理任务的进程。你启动一个 Celery Worker,它会等待任务并执行。

  4. Broker(消息中间件): 用于传递消息的中间件,常用的有 Redis 和 RabbitMQ。Celery 会向 Broker 发送任务,Worker 会从 Broker 接收任务。

简单示例

以下是一个将 Celery 合并到一个 Python 文件的简单示例:

  1. 安装 Celery 和 Redis(如果你使用 Redis 作为 Broker)

pip install celery redis

2.创建一个 Celery 配置并定义一个任务:

# app.py

from celery import Celery

# 创建 Celery 应用对象,设置 Redis 为消息中间件
app = Celery('tasks', broker='redis://localhost:6379/0')

# 定义一个简单的任务
@app.task
def add(x, y):
    return x + y

3.启动 Celery Worker:

celery -A app worker --loglevel=info

4.发送任务:

# 调用任务并获取结果
from app import add

result = add.delay(4, 6)
print(result.get())  # 输出 10

add.delay(4, 6) 发送任务到 Celery,result.get() 会阻塞直到任务完成并返回结果。

代码说明

  • Celery('tasks', broker='redis://localhost:6379/0'):创建一个 Celery 应用并指定 Redis 为消息中间件。

  • @app.task:装饰器将 add 函数标记为 Celery 的任务。

  • add.delay(4, 6):使用 delay() 方法将任务异步发送到消息队列。

  • result.get():阻塞并等待结果返回。

注意事项

  • 需要确保 Redis 服务已启动,或者你可以选择其他消息中间件(如 RabbitMQ)。

  • Celery Worker 需要在后台运行,以便执行任务。

这个例子是一个最基础的示例,Celery 还支持任务调度、定时任务、任务结果存储等高级功能,适合做更复杂的异步任务处理。


4o