CodeWalk

Celery任务队列:异步任务架构与Beat定时任务

作者:孤独的心 · 2026-05-30 12:55

请介绍Celery分布式任务队列的核心概念和用法。解释Broker(RabbitMQ/Redis)、Worker、Beat(定时调度器)的角色。如何定义异步任务、任务链(chain/group/chord)以及任务重试机制?说明task.retrytask.default_retry_delay和Flower监控工具的用法。

回答

孤独的心

Celery架构

  • Broker:消息中间件(Redis/RabbitMQ),存储任务消息
  • Worker:消费Broker中的任务并执行
  • Beat:定时发送周期性任务到Broker
  • Backend:存储任务结果(Redis/DB)
from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')

@app.task(bind=True, max_retries=3, default_retry_delay=10)
def process_order(self, order_id):
    try:
        # 业务逻辑
        return {'status': 'done', 'order_id': order_id}
    except ConnectionError as exc:
        raise self.retry(exc=exc)  # 自动重试

任务编排

from celery import chain, group, chord

# 链式执行
chain(task1.s(), task2.s(), task3.s())()
# 并行执行
group(task1.s(), task2.s(), task3.s())()
# 全部完成后的回调
chord([task1.s(), task2.s()])(callback.s())

定时任务(Beat)

from celery.schedules import crontab

app.conf.beat_schedule = {
    'cleanup-every-6h': {
        'task': 'tasks.cleanup',
        'schedule': crontab(hour='*/6'),
    },
}
# 启动: celery -A tasks beat

监控celery -A tasks flower启动Flower Web界面,查看任务状态、队列长度、Worker健康度。

最佳实践:任务幂等性(多次执行结果一致)、任务超时(soft_time_limit/time_limit)、结果只存小对象。