配置

celeryconfig.py

import os
task_serializer = 'json'

broker_transport_options = {
  # "allow_create_topics": True,
}
broker_connection_retry_on_startup = True

# For using SQLAlchemy as the backend
# result_backend = 'db+postgresql://postgres:example@localhost/postgres'

broker_transport_options.update({
  "security_protocol":"SASL_USERNAME",
  "sasl_mechanism":"SCRAM-SHA-512",
})
sasl_username = os.environ["SASL_USERNAME"]
sasl_password = os.environ["SASL_PASSWORD"]

broker_url = f"confluentkafka://{sasl_username}:{sasl_password}@broker:9094"
broker_transport_options.update({
  "kafka_admin_config":{
    "sasl.username": sasl_username,
    "sasl.password": sasl_password,
  },
  "kafka_common_config": {
    "sasl.username": sasl_username,
    "sasl.password": sasl_password,
    "security.protocol":"SASL_SSL",
    "sasl.mechanism":"SCRAM-SHA-512",
    "bootstrap_servers":"broker:9094",
  }
})

task.py

from celery import Celery

app = Celery('task')
app.config_from_object('celeryconfig')

@app.task
def add(x, y)
	return x + y

鉴权

以上SASL的用户名和密码都是通过环境变量进行传递的。

更多信息

Celery路由到kafka的同名topic。例如,命名为app = Celery('task') task的队列将路由到同名的话题下。当使用支持canvas的结果存储时,常见的chain、group和chord选项都可以使用。

局限性

目前,使用kafka当作broker的时候,仅支持一个worker进行消费。