安装

pip install "celery[gcpubsub]"

配置

通过一下方式来指定gcpubsub项目位置。

broker_url = 'gcpubsub://projects/project-id'

请别忘了添加projects/ 前缀。

登录凭证通过环境中固定的GCP的来实现。

选项

expiration_seconds

默认设置pubsub消息和订阅将在24小时后过期,可以通过配置expiration_seconds设置来设置:

expiration_seconds = 86400

更多信息参考:https://cloud.google.com/pubsub/docs

Ack Deadline seconds

这个参数定义了worker确认收到任务的时间,如果超过该时间,会进行重发。

broker_transport_options = {'ack_deadline_seconds': 60}  # 1 minute.

默认时间时240s,worker会根据消息的阻塞数量进行自动扩容。

更多信息参考:https://cloud.google.com/pubsub/docs/lease-management

polling_interval

该参数用于控制每次尝试读取消息失败后,工作程序(worker)要等待的秒数。这个值可以是整数也可以是浮点数,默认是0.1s。这并不意味着worker每0.1s就会访问一次消息api,因为它将被Pub/Sub API的调用阻塞,该调用只会在有新消息要读取或超过10秒后才返回。

broker_transport_options = {'polling_interval': 0.3}

非常频繁的轮询间隔可能导致繁忙的循环,从而导致工作线程使用大量CPU时间。如果你需要亚毫秒级的精度,你应该考虑使用其他传输,比如RabbitMQ ,或者Redis

queue_name_prefix

默认情况下,Celery会为队列名称加上 kombu-前缀,如果有其他使用Pub/Sub的服务,你可以使用以下配置加以区分:

broker_transport_options = {'queue_name_prefix': 'kombu-'}

结果

Google Cloud Storage(GCS)同样是一个非常好的结果存储候选方案。

注意事项

使用 Celery Flower 时,必须使用 --inspect-timeout=10 选项以正确检测工作程序状态。

GCP 订阅处于空闲状态(无排队消息)的情况下,配置为在 24 小时后自动移除。这样做的目的是降低成本。

排队和未确认的消息设置为在 24 小时后自动清理,原因同上。

频道队列大小是估算值,可能不准确。原因是 Pub/Sub API 不提供获取订阅中确切消息数量的方法。

孤立的(无订阅者的)Pub/Sub 主题不会被自动移除!由于 GCP 对每个项目的主题数量有 10,000 的硬限制,建议定期手动移除孤立主题。

最大消息大小限制为 10MB,作为解决方案,可以使用 GCS 后端将消息存储在 Google Cloud Storage 中,并将 GCS URL 传递给任务。