Celery异步任务队列 / 任务调度框架笔记

发布时间:2026/7/6 7:07:46

Celery异步任务队列 / 任务调度框架笔记 Celery 零基础系统入门教程我会按照「认知→环境→上手→核心原理→实战常用→工程化→避坑」的路径带你循序渐进学习全程带可直接运行的代码新手友好重点标注 Windows 系统的专属避坑点。一、基础认知Celery 是什么学了能干嘛Celery 是 Python 生态最主流的分布式异步任务队列 / 任务调度框架核心解决 3 类核心问题异步任务把耗时操作发邮件 / 短信、大文件处理、AI 推理、数据清洗丢到后台执行不阻塞主程序比如 Web 接口定时任务替代 Linux crontab支持更灵活的定时规则每月最后一个周五、每隔 1 小时、节假日跳过等分布式任务多机器多进程并行执行任务轻松横向扩展处理海量任务队列核心适用场景Web 后端异步处理注册邮件、订单超时取消、报表生成大幅提升接口响应速度数据开发分布式爬虫、批量数据 ETL、离线计算运维自动化定时数据库备份、服务巡检、消息推送5 个核心基础组件先记名字后面结合代码讲透组件核心作用通俗类比Broker消息中间件任务队列的「中转站」接收任务并分发给 Worker快递中转站Worker执行任务的工作进程持续监听 Broker拿到任务就执行干活的工人Task你定义的、需要异步 / 定时执行的 Python 函数要干的具体活Backend存储任务的执行结果、状态、异常信息结果仓库Beat定时调度器按配置的时间向 Broker 发送任务定时派活的调度员二、环境搭建新手避坑重点必备环境要求Python 3.8Celery 5.x 最低要求 Python3.7推荐 3.8Broker Backend新手优先用 Redis安装简单、兼容性好无需复杂配置分步安装步骤 1安装 Celery 核心库打开终端 / 命令行执行pip install celery步骤 2安装 RedisBrokerBackendLinux/macOS# Ubuntu/Debian sudo apt install redis-server # macOS brew install redis # 启动Redis redis-serverWindows重点90% 新手踩坑点Celery 4.x 不再官方支持 WindowsRedis 官方无原生稳定 Windows 版推荐 2 种方案方案 1新手首选稳定无坑开启 WSL2 安装 Ubuntu按 Linux 方式安装 Redis方案 2原生 Windows 临时使用安装微软维护的 Redis 发行版 microsoftarchive/redis下载.msi安装包安装后会自动启动 Redis 服务验证 Redis 是否正常打开 cmd执行redis-cli能进入交互界面即为成功。步骤 3安装 Python Redis 依赖Celery 需要 Redis 的 Python 客户端执行pip install redis步骤 4Windows 专属依赖必装否则 Worker 启动报错Windows 下 Celery 默认的 prefork 进程池不兼容必须安装 gevent/eventletpip install gevent三、快速上手第一个可运行的最小 Demo目标10 分钟跑通完整异步任务流程建立信心。步骤 1新建任务文件tasks.py# 1. 导入Celery核心类 from celery import Celery # 2. 初始化Celery实例核心入口 app Celery( tasks, # 实例唯一名称一般用文件名 brokerredis://127.0.0.1:6379/0, # Broker地址Redis第0个库 backendredis://127.0.0.1:6379/0 # Backend地址和Broker共用即可 ) # 3. 定义异步任务用app.task装饰器把普通函数变成Celery任务 app.task def add(x, y): print(f执行加法任务{x} {y} {xy}) return x y步骤 2启动 Celery Worker核心进程打开新的终端进入tasks.py所在的文件夹执行对应启动命令# Linux/macOS 启动命令 celery -A tasks worker --loglevelinfo # Windows 启动命令必须加-P gevent否则必报错 celery -A tasks worker --loglevelinfo -P gevent参数说明-A tasks指定 Celery 实例所在的模块即 tasks.py自动查找里面的 app 实例worker启动 Worker 工作进程--loglevelinfo设置日志级别为 info可看到任务执行详情-P gevent指定 gevent 协程池Windows 必加解决系统兼容性问题启动成功标志终端出现[tasks]下显示你的add任务最终输出ready无红色报错。步骤 3调用异步任务验证完整流程再打开一个新的终端进入同目录启动 Python 交互环境python依次执行以下代码# 导入定义好的add任务 from tasks import add # 核心异步调用用delay()方法把任务发给Broker由Worker执行 result add.delay(3, 5) # 查看任务唯一ID print(f任务ID{result.id}) # 查看任务状态PENDING(等待)、STARTED(执行中)、SUCCESS(成功)、FAILURE(失败) print(f任务状态{result.status}) # 等待任务完成获取执行结果超时时间10秒 print(f任务结果{result.get(timeout10)})执行效果验证调用delay()时当前终端不会阻塞立刻返回 result 对象启动 Worker 的终端会打印出执行加法任务3 5 8说明 Worker 已正常执行任务最终get()方法会拿到返回值8完整流程跑通四、核心概念与核心用法详解1. 任务调用的两种核心方式你刚才用的delay()是简化版还有一个全功能版apply_async()支持所有高级配置。delay(*args, **kwargs)只能传递任务的入参无额外配置是apply_async()的简写apply_async(args(), kwargs{}, **options)支持延迟执行、指定队列、超时、回调等所有高级配置常用示例# 1. 延迟5秒执行任务 add.apply_async(args(3, 5), countdown5) # 2. 指定任务超时时间10秒未执行完自动终止 add.apply_async(args(3, 5), time_limit10) # 3. 定时执行指定具体时间点执行 from datetime import datetime, timedelta # 1分钟后执行 eta datetime.now() timedelta(minutes1) add.apply_async(args(3, 5), etaeta)2. 任务重试机制生产必用处理任务执行失败的场景第三方接口超时、网络波动自动重试避免人工干预。from celery import Celery import requests app Celery(tasks, brokerredis://127.0.0.1:6379/0, backendredis://127.0.0.1:6379/0) app.task( autoretry_for(Exception,), # 遇到哪些异常自动重试 retry_kwargs{max_retries: 3, countdown: 5}, # 最多重试3次每次间隔5秒 retry_backoffTrue, # 指数退避重试间隔越来越长避免压垮服务 ) def get_weather(city): print(f正在获取{city}的天气数据) res requests.get(fhttps://api.weather.com/{city}, timeout3) res.raise_for_status() # 接口报错会触发异常自动重试 return res.json()3. 定时任务Celery BeatCelery 的定时调度器替代 crontab支持灵活的定时规则。步骤 1修改tasks.py添加定时任务配置from celery import Celery from celery.schedules import crontab # 导入crontab表达式 app Celery(tasks, brokerredis://127.0.0.1:6379/0, backendredis://127.0.0.1:6379/0) # 定义任务 app.task def send_daily_report(): print(执行每日报表发送任务) return 报表发送成功 app.task def add(x, y): return x y # 定时任务核心配置 app.conf.beat_schedule { # 任务1每天早上9点执行发送报表 send-daily-report-9am: { task: tasks.send_daily_report, # 任务的完整模块路径 schedule: crontab(hour9, minute0), # 定时规则 }, # 任务2每隔30秒执行一次加法任务 add-every-30-seconds: { task: tasks.add, schedule: 30.0, # 间隔时间单位秒 args: (10, 20), # 任务入参 }, } # 必须配置时区否则定时任务时间差8小时 app.conf.timezone Asia/Shanghai app.conf.enable_utc True步骤 2启动定时任务定时任务需要同时启动 Worker 和 Beat 两个独立进程先启动 Worker新终端同目录# Windows celery -A tasks worker --loglevelinfo -P gevent # Linux/macOS celery -A tasks worker --loglevelinfo再启动 Beat 调度器新终端同目录celery -A tasks beat --loglevelinfo注意Beat 只能启动一个实例多个实例会导致重复发送任务。4. 任务状态与结果查询可通过任务 ID在任意位置查询任务状态和结果比如 Web 接口中给前端提供任务进度查询能力。from tasks import app from celery.result import AsyncResult # 传入任务ID获取任务对象 task_id 你的任务ID result AsyncResult(task_id, appapp) # 核心属性 print(f任务状态{result.status}) print(f是否执行完成{result.ready()}) print(f是否执行成功{result.successful()}) print(f任务结果{result.result if result.successful() else result.traceback}) # 终止正在执行的任务 result.revoke(terminateTrue)五、工程化项目结构实际项目标准写法新手容易把所有代码写在一个文件里实际项目中需要按业务模块化拆分方便维护和扩展。推荐标准目录结构my_celery_project/ ├── celery_app/ # Celery核心目录 │ ├── __init__.py # 初始化Celery实例 │ ├── config.py # 统一配置文件 │ └── tasks/ # 按业务拆分的任务模块 │ ├── __init__.py │ ├── email_tasks.py # 邮件相关任务 │ ├── data_tasks.py # 数据处理任务 │ └── cron_tasks.py # 定时任务 └── main.py # 主程序/业务代码调用任务各文件核心代码1.celery_app/config.py统一配置文件# Broker与Backend配置 broker_url redis://127.0.0.1:6379/0 result_backend redis://127.0.0.1:6379/0 # 时区配置 timezone Asia/Shanghai enable_utc True # 序列化配置统一用json兼容性最好 task_serializer json result_serializer json accept_content [json] # 全局任务配置 task_time_limit 60 # 所有任务最大执行时长60秒 result_expires 24 * 3600 # 任务结果1天后自动过期避免内存溢出2.celery_app/__init__.py初始化 Celery 实例from celery import Celery # 初始化Celery核心实例 app Celery(celery_app) # 加载配置文件 app.config_from_object(celery_app.config) # 自动发现tasks目录下的所有任务无需手动导入 app.autodiscover_tasks([celery_app.tasks])3.celery_app/tasks/email_tasks.py业务任务模块from celery_app import app app.task def send_email(to_email, content): print(f给{to_email}发送邮件内容{content}) # 实际的邮件发送逻辑 return True4.main.py业务调用from celery_app.tasks.email_tasks import send_email from celery_app.tasks.data_tasks import clean_data # 调用异步任务 send_email.delay(userexample.com, 这是一封测试邮件) clean_data.delay([1,2,3,4,5])启动方式进入my_celery_project根目录执行启动命令# Windows celery -A celery_app worker --loglevelinfo -P gevent # Linux/macOS celery -A celery_app worker --loglevelinfo六、新手高频踩坑与解决方案Windows 启动 Worker 报错ValueError: not enough values to unpack解决方案必须安装 gevent启动命令添加-P gevent参数。任务调用后 Worker 无反应不执行排查步骤检查 Redis 是否正常启动broker 地址是否正确有密码的 Redis 地址格式为redis://:password127.0.0.1:6379/0检查 Worker 启动时终端是否打印出了你的任务函数检查代码是否存在循环导入任务文件导入了主程序的内容result.get()一直阻塞拿不到结果解决方案必须配置 backend否则无法存储和获取结果禁止在任务函数内部调用result.get()会造成死锁检查 Worker 终端是否有任务执行报错定时任务不执行 / 时间不对解决方案必须配置timezone Asia/Shanghai默认 UTC 时间会差 8 小时必须同时启动 Worker 和 Beat 两个进程Beat 只负责发任务Worker 负责执行Beat 只能启动一个实例多个会重复发任务任务里的 print 内容看不到解决方案启动 Worker 时添加--loglevelinfo默认日志级别为 warning不会显示 info 级别的 print 内容。

相关新闻