Celery任务队列:异步任务架构与Beat定时任务
请介绍Celery分布式任务队列的核心概念和用法。解释Broker(RabbitMQ/Redis)、Worker、Beat(定时调度器)的角色。如何定义异步任务、任务链(chain/group/chord)以及任务重试机制?说明task.retry、task.default_retry_delay和Flower监控工具的用法。
回答
孤独的心
Celery架构:
- Broker:消息中间件(Redis/RabbitMQ),存储任务消息
- Worker:消费Broker中的任务并执行
- Beat:定时发送周期性任务到Broker
- Backend:存储任务结果(Redis/DB)
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')
@app.task(bind=True, max_retries=3, default_retry_delay=10)
def process_order(self, order_id):
try:
# 业务逻辑
return {'status': 'done', 'order_id': order_id}
except ConnectionError as exc:
raise self.retry(exc=exc) # 自动重试
任务编排:
from celery import chain, group, chord
# 链式执行
chain(task1.s(), task2.s(), task3.s())()
# 并行执行
group(task1.s(), task2.s(), task3.s())()
# 全部完成后的回调
chord([task1.s(), task2.s()])(callback.s())
定时任务(Beat):
from celery.schedules import crontab
app.conf.beat_schedule = {
'cleanup-every-6h': {
'task': 'tasks.cleanup',
'schedule': crontab(hour='*/6'),
},
}
# 启动: celery -A tasks beat
监控:celery -A tasks flower启动Flower Web界面,查看任务状态、队列长度、Worker健康度。
最佳实践:任务幂等性(多次执行结果一致)、任务超时(soft_time_limit/time_limit)、结果只存小对象。