配置
celeryconfig.py
import os
task_serializer = 'json'
broker_transport_options = {
# "allow_create_topics": True,
}
broker_connection_retry_on_startup = True
# For using SQLAlchemy as the backend
# result_backend = 'db+postgresql://postgres:example@localhost/postgres'
broker_transport_options.update({
"security_protocol":"SASL_USERNAME",
"sasl_mechanism":"SCRAM-SHA-512",
})
sasl_username = os.environ["SASL_USERNAME"]
sasl_password = os.environ["SASL_PASSWORD"]
broker_url = f"confluentkafka://{sasl_username}:{sasl_password}@broker:9094"
broker_transport_options.update({
"kafka_admin_config":{
"sasl.username": sasl_username,
"sasl.password": sasl_password,
},
"kafka_common_config": {
"sasl.username": sasl_username,
"sasl.password": sasl_password,
"security.protocol":"SASL_SSL",
"sasl.mechanism":"SCRAM-SHA-512",
"bootstrap_servers":"broker:9094",
}
})
task.py
from celery import Celery
app = Celery('task')
app.config_from_object('celeryconfig')
@app.task
def add(x, y)
return x + y
鉴权
以上SASL的用户名和密码都是通过环境变量进行传递的。
更多信息
Celery路由到kafka的同名topic。例如,命名为app = Celery('task') task的队列将路由到同名的话题下。当使用支持canvas的结果存储时,常见的chain、group和chord选项都可以使用。
局限性
目前,使用kafka当作broker的时候,仅支持一个worker进行消费。