什么是任务队列
任务队列是一种用来跨线程或机器调度任务的组件。
任务队列的输入是一种叫做task的工作单元。worker持续监控并完成队列中的新任务。
Celery借助message进行通信,通常使用broker来实现客户端和worker之间的交流。初始化一个任务,依赖客户端发送一条message到broker,随后broker将它传递给worker。
一个Celery系统可以用包含很多worker和broker的方式来提高自己的高可用性和水平扩展。
Celery是用python实现的,但是它的协议可以用任何编程语言实现。这里补充几个实现方案:Node.js客户端、Node.js客户端type-script版本以及PHP客户端。
跨语言调用的问题同样可以通过HTTP通信来解决(webhooks)。
我需要准备什么
Celery需要消息Broker来收发消息。完全支持RabbitMQ和Redis 作为中间件,也支持其他很多的实验性方案,包括使用SQLite等进行本地开发的方式。
[!NOTE]
Celery 5.3 版本:
- Python ❨3.8, 3.9, 3.10, 3.11❩
- PyPy3.8+ ❨v7.3.11+❩
Celery 4.x 是支持 Python 2.7的最后一个版本, Celery 5.x 要求 Python 3.6 或者更晚, Celery 5.2.x 要求 Python 3.7 或者更晚。
如果你在使用更旧版本的Celery,你需要更旧版本的Python
- Python 2.7 or Python 3.5: Celery series 4.4 或者更早.
- Python 2.6: Celery series 3.1 或者更早.
- Python 2.5: Celery series 3.0 或者更早.
- Python 2.4 was Celery series 2.2 或者更早.
Celery是一个资金很少的项目,所以我们不支持微软Windows。请不要提问任何与该平台相关的问题。
入门
如果这是你第一次使用Celery,或者你没有在3.1版本开发过而是来自更早的版本,你可以阅读入门教程:
Celery特性
-
简单
Celery易用易维护,并且不需要配置文件。
它有活跃、友好的社区,在那里你可以通过交流获得帮助,包括邮件组和IRC channel。
这里有一个极简的示例:
from celery import Celery app = Celery('hello', broker='amqp://guest@localhost//') @app.task def hello(): return "hello world" -
高可用
worker和客户端会在连接失败或丢失的情况下自动重试,并且一些中间件支持主从复制和主主复制的方式来实现高可用。
-
快速
一个单Celery节点可以在亚ms级别的往返延时条件下处理百万任务。(借助RabbitMQ、librabbitmq和优化设置的方式)。
-
灵活
几乎Celery架构中的任意一个部分都可以单独扩展或单独使用,自定义实现池、序列化器、压缩协议、日志、调度器、消费者、生产者、传输介质等。
-
它支持:
- Broker
- RabbitMQ
- Redis
- Amazon SQS
- 等等
- 并发
- prefork(多进程)
- Eventlet,gevent
- thread(多线程)
- solo(单线程)
- Backend
- AMQP, Redis
- Memcached,
- SQLAlchemy, Django ORM
- Apache Cassandra, Elasticsearch, Riak
- MongoDB, CouchDB, Couchbase, ArangoDB
- Amazon DynamoDB, Amazon S3
- Microsoft Azure Block Blob, Microsoft Azure Cosmos DB
- Google Cloud Storage
- File system
- 序列化
- pickle, json, yaml, msgpack.
- zlib, bzip2 compression.
- Cryptographic message signing.
- Broker
功能
-
监控
worker产出监控事件流,并通过内置或外部工具告诉你集群的实时状态。了解更多。
-
工作流
简单和复杂的工作可用用一种被我们称为“canvas”的语法进行描述,包括分组、连接、分块等等。了解更多。
-
耗时&频率控制
你可以控制任意一种任务类型或任意高一个worker的执行频率及执行时间。了解更多。
-
调度
你可以通过简单指令或者Crontab表达式来定制秒级限时任务或分钟级、小时级、天级、周级、月级周期性任务。了解更多。
-
资源泄漏保护
—max-tasks-per-child指令可以用来避免资源泄漏,如不受控的文件描述符或内存。了解更多。 -
用户组件
每个worker组件都是自定义的,用户还可以定义其他组件。worker是使用“引导步骤”构建的——一个可以对worker的内部进行细粒度控制的依赖关系图。
框架集成
Celery很容易与web框架集成,其中一些甚至有集成包:
Pyramid https://pypi.org/project/pyramid_celery/ Pylons https://pypi.org/project/celery-pylons/ Flask 不需要 web2py https://pypi.org/project/web2py-celery/ Tornado https://pypi.org/project/tornado-celery/ Tryton https://pypi.org/project/celery_tryton/
对于 Django 请看 First steps with Django.
集成包并不是绝对必要的,但它们可以使开发变得更容易,有时它们会添加重要的工具,比如关闭数据库连接。
快捷跳转
- get the return value of a task
- use logging from my task
- learn about best practices
- create a custom task base class
- add a callback to a group of tasks
- split a task into several chunks
- optimize the worker
- see a list of built-in task states
- create custom task states
- set a custom task name
- track when a task starts
- retry a task when it fails
- get the id of the current task
- know what queue a task was delivered to
- see a list of running workers
- purge all messages
- inspect what the workers are doing
- see what tasks a worker has registered
- migrate tasks to a new broker
- see a list of event message types
- contribute to Celery
- learn about available configuration settings
- get a list of people and companies using Celery
- write my own remote control command
- Brokers
- Workers
- Security
- Contributing
- Applications
- Daemonizing
- Routing
- Signals
- Tasks
- Monitoring
- Configuration
- FAQ
- Calling
- Optimizing
- Django
- API Reference
安装
你可以通过源码或者python包的方式安装Celery。
pip方式
pip install -U Celery
合集
Celery还定义了一组合集,可用于安装给定特性的依赖项。
您可以在需求中或在pip命令行中使用括号指定这些参数。可以通过逗号分隔多个合集来指定它们。
pip install "celery[librabbitmq]"
$ pip install "celery[librabbitmq,redis,auth,msgpack]"
这些合集包括:
Serializers
-
celery[auth]:for using the
authsecurity serializer. -
celery[msgpack]:for using the msgpack serializer.
-
celery[yaml]:for using the yaml serializer.
Concurrency
-
celery[eventlet]:for using the https://pypi.org/project/eventlet/ pool.
-
celery[gevent]:for using the https://pypi.org/project/gevent/ pool.
Transports and Backends
-
celery[librabbitmq]:for using the librabbitmq C library.
-
celery[redis]:for using Redis as a message transport or as a result backend.
-
celery[sqs]:for using Amazon SQS as a message transport (experimental).
-
celery[tblib]:for using the
task_remote_tracebacksfeature. -
celery[memcache]:for using Memcached as a result backend (using https://pypi.org/project/pylibmc/)
-
celery[pymemcache]:for using Memcached as a result backend (pure-Python implementation).
-
celery[cassandra]:for using Apache Cassandra/Astra DB as a result backend with DataStax driver.
-
celery[couchbase]:for using Couchbase as a result backend.
-
celery[arangodb]:for using ArangoDB as a result backend.
-
celery[elasticsearch]:for using Elasticsearch as a result backend.
-
celery[riak]:for using Riak as a result backend.
-
celery[dynamodb]:for using AWS DynamoDB as a result backend.
-
celery[zookeeper]:for using Zookeeper as a message transport.
-
celery[sqlalchemy]:for using SQLAlchemy as a result backend (supported).
-
celery[pyro]:for using the Pyro4 message transport (experimental).
-
celery[slmq]:for using the SoftLayer Message Queue transport (experimental).
-
celery[consul]:for using the Consul.io Key/Value store as a message transport or result backend (experimental).
-
celery[django]:specifies the lowest version possible for Django support.You should probably not use this in your requirements, it’s here for informational purposes only.
-
celery[gcs]:for using the Google Cloud Storage as a result backend (experimental).
从源码下载
从pypi下载最新版Celery:https://pypi.org/project/celery/
安装方法:
tar xvfz celery-0.0.0.tar.gz
cd celery-0.0.0
python setup.py build
python setup.py install
如果你的当前环境不是虚拟环境,最后一个命令必须在管理员身份下执行。
使用开发版本
使用pip
Celery开发版要求其配套组件也必须是开发版。
包括https://pypi.org/project/kombu/, https://pypi.org/project/amqp/, https://pypi.org/project/billiard/, https://pypi.org/project/vine/.
你可以使用以下命令进行安装:
pip install https://github.com/celery/celery/zipball/main#egg=celery
pip install https://github.com/celery/billiard/zipball/main#egg=billiard
pip install https://github.com/celery/py-amqp/zipball/main#egg=amqp
pip install https://github.com/celery/kombu/zipball/main#egg=kombu
pip install https://github.com/celery/vine/zipball/main#egg=vine
使用git
请看贡献部分。