安装
pip install "celery[gcpubsub]"
配置
通过一下方式来指定gcpubsub项目位置。
broker_url = 'gcpubsub://projects/project-id'
请别忘了添加projects/ 前缀。
登录凭证通过环境中固定的GCP的来实现。
选项
expiration_seconds
默认设置pubsub消息和订阅将在24小时后过期,可以通过配置expiration_seconds设置来设置:
expiration_seconds = 86400
更多信息参考:https://cloud.google.com/pubsub/docs
Ack Deadline seconds
这个参数定义了worker确认收到任务的时间,如果超过该时间,会进行重发。
broker_transport_options = {'ack_deadline_seconds': 60} # 1 minute.
默认时间时240s,worker会根据消息的阻塞数量进行自动扩容。
更多信息参考:https://cloud.google.com/pubsub/docs/lease-management
polling_interval
该参数用于控制每次尝试读取消息失败后,工作程序(worker)要等待的秒数。这个值可以是整数也可以是浮点数,默认是0.1s。这并不意味着worker每0.1s就会访问一次消息api,因为它将被Pub/Sub API的调用阻塞,该调用只会在有新消息要读取或超过10秒后才返回。
broker_transport_options = {'polling_interval': 0.3}
非常频繁的轮询间隔可能导致繁忙的循环,从而导致工作线程使用大量CPU时间。如果你需要亚毫秒级的精度,你应该考虑使用其他传输,比如RabbitMQ
queue_name_prefix
默认情况下,Celery会为队列名称加上 kombu-前缀,如果有其他使用Pub/Sub的服务,你可以使用以下配置加以区分:
broker_transport_options = {'queue_name_prefix': 'kombu-'}
结果
Google Cloud Storage(GCS)同样是一个非常好的结果存储候选方案。
注意事项
使用 Celery Flower 时,必须使用 --inspect-timeout=10 选项以正确检测工作程序状态。
GCP 订阅处于空闲状态(无排队消息)的情况下,配置为在 24 小时后自动移除。这样做的目的是降低成本。
排队和未确认的消息设置为在 24 小时后自动清理,原因同上。
频道队列大小是估算值,可能不准确。原因是 Pub/Sub API 不提供获取订阅中确切消息数量的方法。
孤立的(无订阅者的)Pub/Sub 主题不会被自动移除!由于 GCP 对每个项目的主题数量有 10,000 的硬限制,建议定期手动移除孤立主题。
最大消息大小限制为 10MB,作为解决方案,可以使用 GCS 后端将消息存储在 Google Cloud Storage 中,并将 GCS URL 传递给任务。