什么是任务队列

任务队列是一种用来跨线程或机器调度任务的组件。

任务队列的输入是一种叫做task的工作单元。worker持续监控并完成队列中的新任务。

Celery借助message进行通信,通常使用broker来实现客户端和worker之间的交流。初始化一个任务,依赖客户端发送一条message到broker,随后broker将它传递给worker。

一个Celery系统可以用包含很多worker和broker的方式来提高自己的高可用性和水平扩展。

Celery是用python实现的,但是它的协议可以用任何编程语言实现。这里补充几个实现方案:Node.js客户端Node.js客户端type-script版本以及PHP客户端

跨语言调用的问题同样可以通过HTTP通信来解决(webhooks)。

我需要准备什么

Celery需要消息Broker来收发消息。完全支持RabbitMQ和Redis 作为中间件,也支持其他很多的实验性方案,包括使用SQLite等进行本地开发的方式。

[!NOTE]

Celery 5.3 版本:

  • Python ❨3.8, 3.9, 3.10, 3.11❩
  • PyPy3.8+ ❨v7.3.11+❩

Celery 4.x 是支持 Python 2.7的最后一个版本, Celery 5.x 要求 Python 3.6 或者更晚, Celery 5.2.x 要求 Python 3.7 或者更晚。

如果你在使用更旧版本的Celery,你需要更旧版本的Python

  • Python 2.7 or Python 3.5: Celery series 4.4 或者更早.
  • Python 2.6: Celery series 3.1 或者更早.
  • Python 2.5: Celery series 3.0 或者更早.
  • Python 2.4 was Celery series 2.2 或者更早.

Celery是一个资金很少的项目,所以我们不支持微软Windows。请不要提问任何与该平台相关的问题。

入门

如果这是你第一次使用Celery,或者你没有在3.1版本开发过而是来自更早的版本,你可以阅读入门教程:

Celery特性

  • 简单

    Celery易用易维护,并且不需要配置文件。

    它有活跃、友好的社区,在那里你可以通过交流获得帮助,包括邮件组和IRC channel。

    这里有一个极简的示例:

    from celery import Celery
      
    app = Celery('hello', broker='amqp://guest@localhost//')
      
    @app.task
    def hello():
    	return "hello world"
    
  • 高可用

    worker和客户端会在连接失败或丢失的情况下自动重试,并且一些中间件支持主从复制和主主复制的方式来实现高可用。

  • 快速

    一个单Celery节点可以在亚ms级别的往返延时条件下处理百万任务。(借助RabbitMQ、librabbitmq和优化设置的方式)。

  • 灵活

    几乎Celery架构中的任意一个部分都可以单独扩展或单独使用,自定义实现池、序列化器、压缩协议、日志、调度器、消费者、生产者、传输介质等。

  • 它支持:

    • Broker
      • RabbitMQ
      • Redis
      • Amazon SQS
      • 等等
    • 并发
      • prefork(多进程)
      • Eventlet,gevent
      • thread(多线程)
      • solo(单线程)
    • Backend
      • AMQP, Redis
      • Memcached,
      • SQLAlchemy, Django ORM
      • Apache Cassandra, Elasticsearch, Riak
      • MongoDB, CouchDB, Couchbase, ArangoDB
      • Amazon DynamoDB, Amazon S3
      • Microsoft Azure Block Blob, Microsoft Azure Cosmos DB
      • Google Cloud Storage
      • File system
    • 序列化
      • pickle, json, yaml, msgpack.
      • zlib, bzip2 compression.
      • Cryptographic message signing.

功能

  • 监控

    worker产出监控事件流,并通过内置或外部工具告诉你集群的实时状态。了解更多

  • 工作流

    简单和复杂的工作可用用一种被我们称为“canvas”的语法进行描述,包括分组、连接、分块等等。了解更多

  • 耗时&频率控制

    你可以控制任意一种任务类型或任意高一个worker的执行频率及执行时间。了解更多

  • 调度

    你可以通过简单指令或者Crontab表达式来定制秒级限时任务或分钟级、小时级、天级、周级、月级周期性任务。了解更多

  • 资源泄漏保护

    —max-tasks-per-child指令可以用来避免资源泄漏,如不受控的文件描述符或内存。了解更多

  • 用户组件

    每个worker组件都是自定义的,用户还可以定义其他组件。worker是使用“引导步骤”构建的——一个可以对worker的内部进行细粒度控制的依赖关系图。

框架集成

Celery很容易与web框架集成,其中一些甚至有集成包:

Pyramid https://pypi.org/project/pyramid_celery/
Pylons https://pypi.org/project/celery-pylons/
Flask 不需要
web2py https://pypi.org/project/web2py-celery/
Tornado https://pypi.org/project/tornado-celery/
Tryton https://pypi.org/project/celery_tryton/

对于 Django 请看 First steps with Django.

集成包并不是绝对必要的,但它们可以使开发变得更容易,有时它们会添加重要的工具,比如关闭数据库连接。

快捷跳转

安装

你可以通过源码或者python包的方式安装Celery。

pip方式

pip install -U Celery

合集

Celery还定义了一组合集,可用于安装给定特性的依赖项。

您可以在需求中或在pip命令行中使用括号指定这些参数。可以通过逗号分隔多个合集来指定它们。

pip install "celery[librabbitmq]"

$ pip install "celery[librabbitmq,redis,auth,msgpack]"

这些合集包括:

Serializers

  • celery[auth]:

    for using the auth security serializer.

  • celery[msgpack]:

    for using the msgpack serializer.

  • celery[yaml]:

    for using the yaml serializer.

Concurrency

  • celery[eventlet]:

    for using the https://pypi.org/project/eventlet/ pool.

  • celery[gevent]:

    for using the https://pypi.org/project/gevent/ pool.

Transports and Backends

  • celery[librabbitmq]:

    for using the librabbitmq C library.

  • celery[redis]:

    for using Redis as a message transport or as a result backend.

  • celery[sqs]:

    for using Amazon SQS as a message transport (experimental).

  • celery[tblib]:

    for using the task_remote_tracebacks feature.

  • celery[memcache]:

    for using Memcached as a result backend (using https://pypi.org/project/pylibmc/)

  • celery[pymemcache]:

    for using Memcached as a result backend (pure-Python implementation).

  • celery[cassandra]:

    for using Apache Cassandra/Astra DB as a result backend with DataStax driver.

  • celery[couchbase]:

    for using Couchbase as a result backend.

  • celery[arangodb]:

    for using ArangoDB as a result backend.

  • celery[elasticsearch]:

    for using Elasticsearch as a result backend.

  • celery[riak]:

    for using Riak as a result backend.

  • celery[dynamodb]:

    for using AWS DynamoDB as a result backend.

  • celery[zookeeper]:

    for using Zookeeper as a message transport.

  • celery[sqlalchemy]:

    for using SQLAlchemy as a result backend (supported).

  • celery[pyro]:

    for using the Pyro4 message transport (experimental).

  • celery[slmq]:

    for using the SoftLayer Message Queue transport (experimental).

  • celery[consul]:

    for using the Consul.io Key/Value store as a message transport or result backend (experimental).

  • celery[django]:

    specifies the lowest version possible for Django support.You should probably not use this in your requirements, it’s here for informational purposes only.

  • celery[gcs]:

    for using the Google Cloud Storage as a result backend (experimental).

从源码下载

从pypi下载最新版Celery:https://pypi.org/project/celery/

安装方法:

tar xvfz celery-0.0.0.tar.gz
cd celery-0.0.0
python setup.py build
python setup.py install

如果你的当前环境不是虚拟环境,最后一个命令必须在管理员身份下执行。

使用开发版本

使用pip

Celery开发版要求其配套组件也必须是开发版。

包括https://pypi.org/project/kombu/, https://pypi.org/project/amqp/, https://pypi.org/project/billiard/, https://pypi.org/project/vine/.

你可以使用以下命令进行安装:

pip install https://github.com/celery/celery/zipball/main#egg=celery
pip install https://github.com/celery/billiard/zipball/main#egg=billiard
pip install https://github.com/celery/py-amqp/zipball/main#egg=amqp
pip install https://github.com/celery/kombu/zipball/main#egg=kombu
pip install https://github.com/celery/vine/zipball/main#egg=vine

使用git

请看贡献部分。