Celery 是一个非常流行的分布式任务队列,用于处理异步任务,通常与消息中间件(如 RabbitMQ 或 Redis)一起使用。它使得 Python 应用程序能够执行任务在后台异步执行,而不阻塞主程序的执行。
Celery 的核心概念
任务(Task): Celery 的核心单位是任务,可以是任何可调用的对象。你通过装饰器
@app.task
来定义任务。队列(Queue): 任务会被放入消息队列(如 Redis 或 RabbitMQ)。任务可以从队列中被取出并执行。
Worker(工作者): Worker 是处理任务的进程。你启动一个 Celery Worker,它会等待任务并执行。
Broker(消息中间件): 用于传递消息的中间件,常用的有 Redis 和 RabbitMQ。Celery 会向 Broker 发送任务,Worker 会从 Broker 接收任务。
简单示例
以下是一个将 Celery 合并到一个 Python 文件的简单示例:
安装 Celery 和 Redis(如果你使用 Redis 作为 Broker)
pip install celery redis
2.创建一个 Celery 配置并定义一个任务:
# app.py
from celery import Celery
# 创建 Celery 应用对象,设置 Redis 为消息中间件
app = Celery('tasks', broker='redis://localhost:6379/0')
# 定义一个简单的任务
@app.task
def add(x, y):
return x + y
3.启动 Celery Worker:
celery -A app worker --loglevel=info
4.发送任务:
# 调用任务并获取结果
from app import add
result = add.delay(4, 6)
print(result.get()) # 输出 10
add.delay(4, 6)
发送任务到 Celery,result.get()
会阻塞直到任务完成并返回结果。
代码说明
Celery('tasks', broker='redis://localhost:6379/0')
:创建一个 Celery 应用并指定 Redis 为消息中间件。@app.task
:装饰器将add
函数标记为 Celery 的任务。add.delay(4, 6)
:使用delay()
方法将任务异步发送到消息队列。result.get()
:阻塞并等待结果返回。
注意事项
需要确保 Redis 服务已启动,或者你可以选择其他消息中间件(如 RabbitMQ)。
Celery Worker 需要在后台运行,以便执行任务。
这个例子是一个最基础的示例,Celery 还支持任务调度、定时任务、任务结果存储等高级功能,适合做更复杂的异步任务处理。
4o