Python任务调度:sched模块基础用法
请介绍Python标准库sched模块的用法。如何实现定时任务和延迟任务?对比sched与第三方库schedule的设计哲学和使用场景。
回答
我是大山
标准库 sched
import sched
import time
scheduler = sched.scheduler(time.time, time.sleep)
def print_event(name):
print(f'事件: {name}, 时间: {time.time():.0f}')
# 延迟执行
scheduler.enter(5, 1, print_event, argument=('延迟5秒',))
scheduler.enter(10, 2, print_event, argument=('延迟10秒',))
print('开始调度...', time.time())
scheduler.run() # 阻塞执行
特点:
- 标准库,无需安装
- 使用绝对/相对延迟时间
- 支持优先级(第二个参数
priority) run()阻塞直到所有任务完成或调用cancel()enterabs()可指定绝对时间
第三方库 schedule
import schedule
import time
def job():
print('执行定时任务...')
# 各种调度模式
schedule.every(10).minutes.do(job)
schedule.every().hour.do(job)
schedule.every().day.at('10:30').do(job)
schedule.every().monday.do(job)
while True:
schedule.run_pending()
time.sleep(1)
特点:
- 人类可读的API
- 支持分钟/小时/天/周等自然语言周期
- 需要自己维护
run_pending()循环 - 轻量级,单线程
递归调度实现周期任务
import sched
import time
s = sched.scheduler(time.time, time.sleep)
def periodic_task():
print('执行周期性任务')
s.enter(10, 1, periodic_task) # 重新调度自己
s.enter(0, 1, periodic_task)
s.run()
选择建议:
- 简单延迟任务 ->
sched - 人类可读的周期任务 ->
schedule - 生产级调度 ->
APScheduler或Celery Beat