安装

对于Redis的支持,你必须安装额外的依赖项。你可以使用Celery [redis] 合集一次安装Celery和这些依赖项:

pip install -U "celery[redis]"

配置

配置很容易,只需要配置你的Redis地址

app.conf.broker_url = 'redis://localhost:6379/0'

完整的地址格式是:

redis://:password@hostname:port/db_number

所有的协议字段都是可选的,默认值'redis://' 等于 'redis://localhost:6379/0'

如果使用了Unix socket链接,地址格式为:

redis+socket:///path/to/redis.sock

可以使用virtual_host参数来实现指定db的功能

redis+socket:///path/to/redis.sock?virtual_host=db_number

直接连接到Redis 哨兵列表也很容易:

app.conf.broker_url = 'sentinel://localhost:26379;sentinel://localhost:26380;sentinel://localhost:26381'
app.conf.broker_transport_options = { 'master_name': "cluster1" }

可以使用sentinel_kwargs将其他参数传递给Sentinel客户端:

app.conf.broker_transport_options = { 'sentinel_kwargs': { 'password': "password" } }

可见性超时

可见性超时定义了在消息被重新传递给另一个worker之前等待worker确认任务的秒数。请务必查看下面的注意事项。

这个配置通过broker_transport_options进行配置。

app.conf.broker_transport_options = {'visibility_timeout': 3600}  # 1 hour.

默认时间是一小时。

结果

如果你也像用Redis存储任务状态和结果,你可以使用下面这些配置:

app.conf.result_backend = 'redis://localhost:6379/0'

完整的配置列表,参考Redis backend 设置

如果你使用了哨兵,可以通过result_backend_transport_options设置主节点。

app.conf.result_backend_transport_options = {'master_name': "mymaster"}

全局前缀

全局前缀会拼接到所有任务结果的key中,这可以避免共享redis时带来的key冲突。默认情况下,不会拼接全局前缀。

可以通过result_backend_transport_options下的global_keyprefix进行配置。

app.conf.result_backend_transport_options = {
    'global_keyprefix': 'my_prefix_'
}

连接超时

配置redis作为backend时候的连接超时,使用result_backend_transport_options下的retry_policy字段。

app.conf.result_backend_transport_options = {
    'retry_policy': {
       'timeout': 5.0
    }
}

retry_over_time()可以查看全部可选项。

云原生(serverless)

Celery支持使用远程云原生Redis,这可以显著降低运营开销和成本,使其成为微服务架构或环境中最小化运营成本至关重要的有利选择。云原生Redis提供了必要的功能,而不需要手动设置、配置和管理,与Celery提倡的自动化和可扩展性原则很好地一致。

Upstash

Upstash是一家云原生Redis数据库服务提供商,它有一套很好的Celery+serverless redis的解决方案。Upstash通过多层存储架构提供了终极的一致性模型和持久化的存储,

Upstash提供的示例

重要警告

可见性超时

如果任务未在可见性超时时间内被确认,该任务将被重新投递给另一名Worker并再次执行。对于需要长时间执行的ETA(预计执行时间)、countdown(延迟时间)或重试任务,这会导致问题——若执行时间超过可见性超时,任务将被循环反复执行。

要解决这一问题,可以通过将可见性超时时间设置为与最长预期ETA时间匹配。但此方法不被推荐,因其可能对可靠性产生负面影响。Celery会在Worker关闭时重传消息,因此较长的可见性超时仅会延迟“丢失”任务的重传(例如在断电或强制终止Worker时)。

由于消息代理(Broker)并非数据库,若需为更远的未来调度任务,建议使用基于数据库的定期任务(如Django-Celery-Beat)。周期性任务不受可见性超时影响,因其与ETA/countdown等机制属于不同概念。

可通过配置以下选项以调整可见性超时时间:

app.conf.broker_transport_options = {'visibility_timeout': 43200}
app.conf.result_backend_transport_options = {'visibility_timeout': 43200}
app.conf.visibility_timeout = 43200

值必须是一个second为单位的int类型。

逐出问题

在某些情况下,Redis可能会从数据库中逐出某些键。

如果遇到如下错误:

InconsistencyError: Probably the key ('_kombu.binding.celery') has been
removed from the Redis database.

你可以在redis配置文件配置键逐出策略:

  • the maxmemory option
  • the maxmemory-policy option to noeviction or allkeys-lru

有关详细信息,请参阅Redis服务器文档:https://redis.io/topics/lru-cache。

有序返回结果

4.4.6及更早的Celery都是通过无序列表来存储Redis结果的。这会导致结果与他们关联任务的原始顺序不一致。Celery 4.4.7版本介绍了一种可选行为来修复这个问题,并保证返回结果的有序。Celery 5.0设置为来默认开启的可选配置。

# Specifying this for workers running Celery 4.4.6 or earlier has no effect
app.conf.result_backend_transport_options = {
    'result_chord_ordered': True    # or False
}

注:对运行Celery 4.4.6及更早版本的Worker设置此配置无效

此改动对共享同一Redis后端的Worker运行时行为不兼容,所有Worker需统一采用新旧同一行为以避免故障。若集群中存在运行4.4.6及更早版本的Worker,则4.4.7版本的Worker无需特殊配置;5.0及以上版本的Worker需将result_chord_ordered设为False

若集群无4.4.6及更早版本Worker,但含4.4.7版本Worker,则建议将所有Worker的result_chord_ordered设为True,便于未来迁移。跨行为迁移将破坏Redis后端当前存储的结果并且迁移后的Worker处理下游任务可能引发错误——请提前规划。