安装

使用Amazon SQS,你需要安装额外的依赖,这里推荐使用一键安装组合包的方式:

pip install "celery[sqs]"

环境配置

你需要在broker URL中声明使用SQS:

# 填充的格式是 sqs://aws_access_key_id:aws_secret_access_key@
broker_url = 'sqs://ABCDEFGHIJKLMNOPQRST:ZYXK7NiynGlTogH8Nj+P9nlE73sq3@'

一定不要忘了末尾的 @以及对密码进行编码(密码中有可能存在一些无法被转为url编码的特殊字符,所以需要safequote 函数被用来确保 username 和 password 中的任何特殊字符都被适当地转义)。

from kombu.utils.url import safequote

aws_access_key = safequote("ABCDEFGHIJKLMNOPQRST")
aws_secret_key = safequote("ZYXK7NiynG/TogH8Nj+P9nlE73sq3")

broker_url = "sqs://{aws_access_key}:{aws_secret_key}@".format(
    aws_access_key=aws_access_key, aws_secret_key=aws_secret_key,
)

[!WARNING]

如果是和Django一起使用,请勿开启DEBUG=True 配置选项。该设置可能引发安全风险。

当处于Debug模式时,Django 会打印环境变量信息,导致 SQS(Simple Queue Service)的 URL 以及您的 AWS 访问密钥(Access Key)和秘密密钥(Secret Key)暴露在互联网上。

以下两种情况,broker_url可以设置为sqs://

  • 使用环境变量AWS_ACCESS_KEY_ID和AWS_SECRET_ACCESS_KEY来设置登录凭据。
  • 在实例上以IAM身份登录,kombu会尝试从实例元数据中检索访问令牌。

可选项

region

默认使用 us-east-1 作为地区,你也可以通过broker_transport_options 进行配置:

broker_transport_options = {'region': 'eu-west-1'}

[!NOTE]

关于Amazon Web Services region的完整介绍,可以看这个网址:http://aws.amazon.com/about-aws/globalinfrastructure/

Visibility Timeout

这个配置用来设置broker等待worker确认任务完成的时间,如果到时间仍然没收到这个确认信号,broker会重发消息。下面是一些注意事项:

这个配置也通过broker_transport_options 进行配置:

broker_transport_options = {'visibility_timeout': 3600}  # 1 hour.

默认确认时间是30分钟。

这个配置只在创建SQS队列时起作用,对使用predefined queues的情况无效。

补充一下这个字段和@app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 5}, ack_late=True) 的countdown的区别。截屏2025-03-21 16.56.58

Polling Interval

轮询间隔决定了在连续两次不成功的轮询之间暂停的秒数。此值可以是整数或浮点数。默认值为1秒:这意味着当没有新消息可读取时,工作进程将间歇休眠1秒。

需要注意的是,更频繁的轮询成本更高,因此增大轮询间隔可以节约开销。

可通过broker_transport_options设置配置轮询间隔:

broker_transport_options = {'polling_interval': 0.3}

非常短的轮询间隔可能导致循环等待,进而导致工作进程占用大量CPU时间。如需亚毫秒级精度,建议改用其他传输方案,例如RabbitMQ 或Redis 。

Long Polling

默认情况下,SQS Long Polling 会默认开启,ReceiveMessage 的 WaitTimeSeconds 选项被默认设置为10s。

WaitTimeSeconds 参数可以通过broker_transport_options 进行配置:

broker_transport_options = {'wait_time_seconds': 15}

有效值是0到20。值得注意的是,新创建的延时队列就默认值是0。

Queue Prefix

默认情况下,Celery不会在队列名字前面添加前缀,如果你需要与其他使用SQS的服务做区分,你可以通过 broker_transport_options 进行设置:

broker_transport_options = {'queue_name_prefix': 'celery-'}

Predefined Queues

如果你想要使用AWS 中的predefined queues,并且不需要列出、创建或删除SQS queue,可以用predefined_queues设置将队列名称映射传递给url:

broker_transport_options = {
    'predefined_queues': {
        'my-q': {
            'url': 'https://ap-southeast-2.queue.amazonaws.com/123456/my-q',
            'access_key_id': 'xxx',
            'secret_access_key': 'xxx',
        }
    }
}

当使用此选项时,可见性超时应该在SQS队列中设置(在AWS中),而不是通过过broker_transport_options 的visibility_timeout选项设置。

Back-off policy

回退策略是使用SQS可见性超时机制改变任务重试之间的时间差。该机制将特定消息的可见性超时从队列默认可见性超时更改为策略配置的超时。重试次数由SQS管理(特别是通过approatereceivecount消息属性),用户不需要进一步的操作。

以下是一个例子:

broker_transport_options = {
    'predefined_queues': {
        'my-q': {
            'url': 'https://ap-southeast-2.queue.amazonaws.com/123456/my-q',
            'access_key_id': 'xxx',
            'secret_access_key': 'xxx',
            'backoff_policy': {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640},
            'backoff_tasks': ['svc.tasks.tasks.task1']
        }
    }
}

以下是配置的含义,比如第二次重试,会等待20s,越往后等待时机越久。

Attempt Delay
2nd attempt 20 seconds
3rd attempt 40 seconds
4th attempt 80 seconds
5th attempt 320 seconds
6th attempt 640 seconds

STS token authentication

https://docs.aws.amazon.com/cli/latest/reference/sts/assume-role.html

通过使用sts_role_arn和sts_token_timeout代理传输选项支持AWS STS身份验证。sts_role_arn是我们用来授权访问SQS的假定IAM角色ARN。Sts_token_timeout是令牌超时,默认值(最小值)为900秒。每次过期后,将创建一个新的令牌:

broker_transport_options = {
    'predefined_queues': {
        'my-q': {
            'url': 'https://ap-southeast-2.queue.amazonaws.com/123456/my-q',
            'access_key_id': 'xxx',
            'secret_access_key': 'xxx',
            'backoff_policy': {1: 10, 2: 20, 3: 40, 4: 80, 5: 320, 6: 640},
            'backoff_tasks': ['svc.tasks.tasks.task1']
        }
    },
'sts_role_arn': 'arn:aws:iam::<xxx>:role/STSTest', # optional
'sts_token_timeout': 900 # optional
}

注意

  • 如果一个任务在visbility_timeout时间内没有得到确认,那么这个任务将被重新交付给另一个worker并执行。

    这会导致ETA/倒计时/重试任务的问题,其中执行时间超过可见性超时;实际上,如果发生这种情况,它将再次执行,并且再次在循环中执行。

    因此,您必须增加可见性超时,以匹配您计划使用的最长ETA时间。

    请注意,Celery会在worker关闭时重新发送消息,所以长时间的可见性超时只会在发生电源故障或强制终止worker的情况下延迟“丢失”任务的重新发送。

    周期性任务不会受到可见性超时的影响,因为它是一个独立于ETA/倒计时的概念。

    截至目前,AWS支持的最大可见性超时是12小时(43200秒):

    broker_transport_options = {'visibility_timeout': 43200}
    
  • SQS目前不支持通过远程命令来操作Worker行为。

  • SQS不支持事件监控,所以不能与如celery events, celerymon, 或者其他的Django 用户监控套件一起使用。

  • 对于FIFO队列,可能需要在发布消息时传递额外的消息属性,如MessageGroupId和MessageDeduplicationId。

    消息属性可以作为关键字参数传递给apply_async():

    message_properties = {
        'MessageGroupId': '<YourMessageGroupId>',
        'MessageDeduplicationId': '<YourMessageDeduplicationId>'
    }
    task.apply_async(**message_properties)
    

结果存储

Amazon Web Services家族中的多个产品可能是存储或发布结果的好选择,但目前还没有包含这样的结果后端。

[!WARNING]

不要将amqp结果后端与SQS一起使用。

它将为每个任务创建一个队列,并且这些队列不会被合并。这可能会增加成本。