跳到内容

适用于 Apache Kafka 的 Knative Broker

Knative Broker for Apache Kafka 是 Knative Broker API 的实现,原生面向 Apache Kafka,以减少网络跳数,并为 Broker 和 Trigger API 模型提供与 Apache Kafka 更好的集成。

主要功能包括

Knative Kafka Broker 使用 二进制内容模式 将传入的 CloudEvents 存储为 Kafka 记录,因为它通过传输或路由优化效率更高,并且避免了 JSON 解析。使用 二进制内容模式 意味着所有 CloudEvent 属性和扩展都映射为 Kafka 记录上的头,而 CloudEvent 的 数据 对应于 Kafka 记录的实际值。这是使用 二进制内容模式 优于 结构化内容模式 的另一个好处,因为它更不“阻碍”,因此与不理解 CloudEvents 的系统兼容。

先决条件

这些说明假设您的集群管理员已经安装了 Knative Kafka 代理

创建 Kafka 代理

Kafka 代理对象如下所示

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    # case-sensitive
    eventing.knative.dev/broker.class: Kafka
    # Optional annotation to point to an externally managed kafka topic:
    # kafka.eventing.knative.dev/external.topic: <topic-name>
  name: default
  namespace: default
spec:
  # Configuration specific to this broker.
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing
  # Optional dead letter sink, you can specify either:
  #  - deadLetterSink.ref, which is a reference to a Callable
  #  - deadLetterSink.uri, which is an absolute URI to a Callable (It can potentially be out of the Kubernetes cluster)
  delivery:
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: dlq-service

配置 Kafka 代理

spec.config 应引用任何 namespace 中的任何 ConfigMap,其外观如下所示

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: your-namespace
data:
  # Number of topic partitions
  default.topic.partitions: "10"
  # Replication factor of topic messages.
  default.topic.replication.factor: "3"
  # A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
  bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"

ConfigMap 安装在集群的 Knative Eventing SYSTEM_NAMESPACE 中。您可以根据需要编辑全局配置。您也可以通过在 Kafka 代理的 spec.config 字段中引用不同 namespace 或具有不同 name 的不同 ConfigMap 来逐个代理覆盖这些设置。

注意

default.topic.replication.factor 的值必须小于或等于集群中 Kafka 代理实例的数量。例如,如果您只有一个 Kafka 代理,default.topic.replication.factor 的值不应超过 1

Knative 支持您的 Kafka 版本支持的全套主题配置选项。要设置其中任何一个,您需要在 configmap 中添加一个带有 default.topic.config. 前缀的键。例如,要设置 retention.ms 值,您需要修改 ConfigMap 如下所示

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: knative-eventing
data:
  # Number of topic partitions
  default.topic.partitions: "10"
  # Replication factor of topic messages.
  default.topic.replication.factor: "3"
  # A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
  bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"
  # Here is our retention.ms config
  default.topic.config.retention.ms: "3600"

安全

Apache Kafka 支持不同的安全特性,Knative 支持以下特性

要启用安全特性,在 broker.spec.config 引用的 ConfigMap 中,我们可以引用一个 Secret

apiVersion: v1
kind: ConfigMap
metadata:
   name: kafka-broker-config
   namespace: knative-eventing
data:
   # Other configurations
   # ...

   # Reference a Secret called my_secret
   auth.secret.ref.name: my_secret

Secret my_secret 必须与 broker.spec.config 引用的 ConfigMap 位于同一命名空间中,在本例中为:knative-eventing

注意

证书和密钥必须是PEM 格式

使用 SASL 进行身份验证

Knative 支持以下 SASL 机制

  • PLAIN
  • SCRAM-SHA-256
  • SCRAM-SHA-512
  • AWS MSK IAM 的 OAUTHBEARER

要使用特定的 SASL 机制,请将 <sasl_mechanism> 替换为您选择的机制。

使用 SASL 身份验证,不加密

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SASL_PLAINTEXT \
  --from-literal=sasl.mechanism=<sasl_mechanism> \
  --from-literal=user=<my_user> \
  --from-literal=password=<my_password>

使用 SASL 身份验证并使用 SSL 加密

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SASL_SSL \
  --from-literal=sasl.mechanism=<sasl_mechanism> \
  --from-file=ca.crt=caroot.pem \
  --from-literal=user=<my_user> \
  --from-literal=password=<my_password>

使用 SSL 加密,无需客户端身份验证

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SSL \
  --from-file=ca.crt=<my_caroot.pem_file_path> \
  --from-literal=user.skip=true

注意

可以省略 ca.crt 以回退到使用系统的根 CA 集。

使用 SSL 进行身份验证和加密

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SSL \
  --from-file=ca.crt=<my_caroot.pem_file_path> \
  --from-file=user.crt=<my_cert.pem_file_path> \
  --from-file=user.key=<my_key.pem_file_path>

AWS MSK IAM 身份验证

AWS MSK IAM 身份验证需要创建密钥和 Java 属性配置。

在以下 ConfigMap 中,将以下内容追加到列出的属性值中。如果使用假定的 IAM 角色,请将 awsRoleArn="<role_arn>" 添加到 sasl.jaas.config 的值中。

  • config-kafka-broker-data-plane
    • config-kafka-broker-producer.properties
    • config-kafka-broker-consumer.properties
  • config-kafka-channel-data-plane
    • config-kafka-channel-producer.properties
    • config-kafka-channel-consumer.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required awsStsRegion="<region>";
sasl.login.callback.handler.class=software.amazon.msk.auth.iam.IAMOAuthBearerLoginCallbackHandler
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMOAuthBearerLoginCallbackHandler

创建密钥以使用默认 AWS 凭证

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SASL_SSL \
  --from-literal=sasl.mechanism=OAUTHBEARER \
  --from-literal=type=OAUTHBEARER \
  --from-literal=tokenProvider=MSKAccessTokenProvider

或者创建密钥以使用假定角色

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SASL_SSL \
  --from-literal=sasl.mechanism=OAUTHBEARER \
  --from-literal=type=OAUTHBEARER \
  --from-literal=tokenProvider=MSKRoleAccessTokenProvider \
  --from-literal=roleARN=<role_arn>

自带主题

默认情况下,Knative Kafka 代理会创建自己的内部主题,但可以使用 kafka.eventing.knative.dev/external.topic 注释指向外部管理的主题

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    # case-sensitive
    eventing.knative.dev/broker.class: Kafka
    kafka.eventing.knative.dev/external.topic: <my-topic-name>
  name: default
  namespace: default
spec:
  # other spec fields ...

注意

使用外部主题时,Knative Kafka 代理不拥有该主题,也不负责管理该主题。这包括主题的生命周期或其总体有效性。可能对主题的一般访问施加其他限制。请参阅有关使用访问控制列表 (ACL) 的文档。

配置事件的投递顺序

在分派事件时,可以配置 Kafka 代理以支持不同的投递顺序保证。

您可以使用 Trigger 对象上的 kafka.eventing.knative.dev/delivery.order 注释来配置事件的投递顺序

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: my-service-trigger
  annotations:
     kafka.eventing.knative.dev/delivery.order: ordered
spec:
  broker: my-kafka-broker
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: my-service

支持的消费者投递保证是

  • unordered:无序使用者是一个非阻塞使用者,它无序地投递消息,同时保留适当的偏移量管理。当对并行消费有很高的需求且不需要显式排序时很有用。一个例子可能是处理点击分析。
  • ordered:有序使用者是一个每个分区的阻塞使用者,它在向 CloudEvent 订阅者成功发送响应之前,会等待传递分区的下一条消息。当需要更严格的排序或事件之间存在关系或分组时很有用。一个例子可能是处理客户订单。

unordered 投递是默认的排序保证。

数据平面隔离与共享数据平面

Knative Kafka 代理实现有两个平面:控制平面和数据平面。控制平面由与 Kubernetes API 通信、监视自定义对象和管理数据平面的控制器组成。

数据平面是侦听传入事件、与 Apache Kafka 通信以及向事件接收器发送事件的组件集合。这是事件流经的地方。Knative Kafka 代理数据平面由 kafka-broker-receiverkafka-broker-dispatcher 部署组成。

当使用 Broker 类 Kafka 时,Knative Kafka 代理使用共享数据平面。这意味着 knative-eventing 命名空间中的 kafka-broker-receiverkafka-broker-dispatcher 部署用于集群中的所有 Kafka 代理。

然而,当 KafkaNamespaced 设置为 Broker 类时,Kafka 代理控制器会为存在代理的每个命名空间创建一个新的数据平面。此数据平面用于该命名空间中的所有 KafkaNamespaced 代理。

这在数据平面之间提供了隔离,这意味着用户命名空间中的 kafka-broker-receiverkafka-broker-dispatcher 部署仅用于该命名空间中的代理。

注意

作为单独数据平面的结果,此安全特性会创建更多的部署并使用更多的资源。除非您有此类隔离要求,否则建议使用类为 Kafka 的*常规*代理。

要创建 KafkaNamespaced 代理,您必须将 eventing.knative.dev/broker.class 注释设置为 KafkaNamespaced

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    # case-sensitive
    eventing.knative.dev/broker.class: KafkaNamespaced
  name: default
  namespace: my-namespace
spec:
  config:
     # the referenced `configmap` must be in the same namespace with the `Broker` object, in this case `my-namespace`
    apiVersion: v1
    kind: ConfigMap
    name: my-config
    # namespace: my-namespace # no need to define, defaults to Broker's namespace

注意

spec.config 中指定的 configmap 必须Broker 对象的命名空间相同

apiVersion: v1
kind: ConfigMap
metadata:
  name: my-config
  namespace: my-namespace
data:
  ...

在创建第一个具有 KafkaNamespaced 类的 Broker 时,kafka-broker-receiverkafka-broker-dispatcher 部署将在命名空间中创建。之后,同一命名空间中所有具有 KafkaNamespaced 类的代理都使用相同的数据平面。当命名空间中没有 KafkaNamespaced 类的代理时,命名空间中的数据平面将被删除。

配置 KafkaNamespaced 代理

Kafka 代理类可用的所有配置机制也可用于 KafkaNamespaced 类的代理,但有以下例外

  • 此页面描述了如何通过修改 knative-eventing 命名空间中的 config-kafka-broker-data-plane configmap 来配置生产者和消费者配置。由于 Kafka 代理控制器将此 configmap 传播到用户命名空间,因此目前无法按命名空间配置生产者和消费者配置。在 knative-eventing 命名空间的 ConfigMap 中设置的任何值也将用于用户命名空间中。
  • 由于相同的传播,也不可能按命名空间配置消费者偏移量提交间隔。
  • 还有一些 configmap 被传播:config-tracingkafka-config-logging。这意味着跟踪和日志记录也不能按命名空间配置。
  • 同样,数据平面部署从 knative-eventing 命名空间传播到用户命名空间。这意味着数据平面部署不能按命名空间配置,并且将与 knative-eventing 命名空间中的部署相同。

使用 KEDA 启用和配置触发器自动扩缩

要启用并配置引用使用 KEDA 的 Kafka 代理的触发器的自动伸缩,请遵循此处说明

附加信息

我们使用分析和 cookie 来了解网站流量。有关您使用我们网站的信息会与 Google 共享以达到此目的。了解更多。