跳到内容

用于 Apache Kafka 的 Knative Broker

用于 Apache Kafka 的 Knative Broker 是 Knative Broker API 的一个实现,它以原生方式针对 Apache Kafka,以减少网络跳跃,并为 Broker 和 Trigger API 模型提供与 Apache Kafka 的更好集成。

值得注意的功能是

Knative Kafka Broker 将传入的 CloudEvents 存储为 Kafka 记录,使用 二进制内容模式,因为它由于其针对传输或路由的优化而更高效,并且避免了 JSON 解析。使用 二进制内容模式 意味着所有 CloudEvent 属性和扩展都映射为 Kafka 记录上的标头,而 CloudEvent 的 数据 对应于 Kafka 记录的实际值。这是使用 二进制内容模式 而不是 结构化内容模式 的另一个好处,因为它不那么阻塞,因此与不了解 CloudEvents 的系统兼容。

先决条件

  1. 您已安装 Knative Eventing。
  2. 您有权访问 Apache Kafka 集群。

提示

如果您需要设置 Kafka 集群,您可以按照 Strimzi 快速入门页面 上的说明进行操作。

安装

  1. 通过输入以下命令安装 Kafka 控制器

    kubectl apply --filename https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-controller.yaml
    
  2. 通过输入以下命令安装 Kafka Broker 数据平面

    kubectl apply --filename https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-broker.yaml
    
  3. 通过输入以下命令验证 kafka-controllerkafka-broker-receiverkafka-broker-dispatcher 是否正在运行

    kubectl get deployments.apps -n knative-eventing
    

    示例输出

    NAME                           READY   UP-TO-DATE   AVAILABLE   AGE
    eventing-controller            1/1     1            1           10s
    eventing-webhook               1/1     1            1           9s
    kafka-controller               1/1     1            1           3s
    kafka-broker-dispatcher        1/1     1            1           4s
    kafka-broker-receiver          1/1     1            1           5s
    

创建 Kafka Broker

Kafka Broker 对象如下所示

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    # case-sensitive
    eventing.knative.dev/broker.class: Kafka
    # Optional annotation to point to an externally managed kafka topic:
    # kafka.eventing.knative.dev/external.topic: <topic-name>
  name: default
  namespace: default
spec:
  # Configuration specific to this broker.
  config:
    apiVersion: v1
    kind: ConfigMap
    name: kafka-broker-config
    namespace: knative-eventing
  # Optional dead letter sink, you can specify either:
  #  - deadLetterSink.ref, which is a reference to a Callable
  #  - deadLetterSink.uri, which is an absolute URI to a Callable (It can potentially be out of the Kubernetes cluster)
  delivery:
    deadLetterSink:
      ref:
        apiVersion: serving.knative.dev/v1
        kind: Service
        name: dlq-service

配置 Kafka Broker

spec.config 应该引用任何 namespace 中的任何类似于以下内容的 ConfigMap

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: knative-eventing
data:
  # Number of topic partitions
  default.topic.partitions: "10"
  # Replication factor of topic messages.
  default.topic.replication.factor: "3"
  # A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
  bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"

ConfigMap 安装在集群中的 Knative Eventing SYSTEM_NAMESPACE 中。您可以根据自己的需要编辑全局配置。您还可以通过在不同的 namespace 中引用不同的 ConfigMap 或在 Kafka Broker 的 spec.config 字段上使用不同的 name 来在每个代理的基础上覆盖这些设置。

注意

default.topic.replication.factor 值必须小于或等于集群中 Kafka 代理实例的数量。例如,如果您只有一个 Kafka 代理,则 default.topic.replication.factor 值不应超过 1

Knative 支持 Kafka 版本支持的完整主题配置选项集。要设置任何这些选项,您需要使用 default.topic.config. 前缀向 configmap 添加一个键。例如,要设置 retention.ms 值,您需要修改 ConfigMap,使其类似于以下内容

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-broker-config
  namespace: knative-eventing
data:
  # Number of topic partitions
  default.topic.partitions: "10"
  # Replication factor of topic messages.
  default.topic.replication.factor: "3"
  # A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
  bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"
  # Here is our retention.ms config
  default.topic.config.retention.ms: "3600"

设置为默认代理实现

要将 Kafka Broker 设置为 Knative 部署中所有代理的默认实现,您可以通过修改 knative-eventing 命名空间中的 config-br-defaults ConfigMap 来应用全局设置。

这使您可以避免为每个代理配置单个或每个命名空间的设置,例如 metadata.annotations.eventing.knative.dev/broker.classspec.config

以下 YAML 是一个使用 Kafka Broker 作为默认实现的 config-br-defaults ConfigMap 的示例。

apiVersion: v1
kind: ConfigMap
metadata:
  name: config-br-defaults
  namespace: knative-eventing
data:
  default-br-config: |
    clusterDefault:
      brokerClass: Kafka
      apiVersion: v1
      kind: ConfigMap
      name: kafka-broker-config
      namespace: knative-eventing
    namespaceDefaults:
      namespace1:
        brokerClass: Kafka
        apiVersion: v1
        kind: ConfigMap
        name: kafka-broker-config
        namespace: knative-eventing
      namespace2:
        brokerClass: Kafka
        apiVersion: v1
        kind: ConfigMap
        name: kafka-broker-config
        namespace: knative-eventing

安全

Apache Kafka 支持不同的安全功能,Knative 支持以下功能

要启用安全功能,在 broker.spec.config 引用的 ConfigMap 中,我们可以引用一个 Secret

apiVersion: v1
kind: ConfigMap
metadata:
   name: kafka-broker-config
   namespace: knative-eventing
data:
   # Other configurations
   # ...

   # Reference a Secret called my_secret
   auth.secret.ref.name: my_secret

Secret my_secret 必须存在于 ConfigMap 引用的同一命名空间中,在本例中为:knative-eventing

注意

证书和密钥必须采用 PEM 格式

使用 SASL 进行身份验证

Knative 支持以下 SASL 机制

  • PLAIN
  • SCRAM-SHA-256
  • SCRAM-SHA-512

要使用特定的 SASL 机制,请将 <sasl_mechanism> 替换为您选择的机制。

使用 SASL 进行身份验证,不加密

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SASL_PLAINTEXT \
  --from-literal=sasl.mechanism=<sasl_mechanism> \
  --from-literal=user=<my_user> \
  --from-literal=password=<my_password>

使用 SASL 进行身份验证,并使用 SSL 进行加密

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SASL_SSL \
  --from-literal=sasl.mechanism=<sasl_mechanism> \
  --from-file=ca.crt=caroot.pem \
  --from-literal=user=<my_user> \
  --from-literal=password=<my_password>

使用 SSL 进行加密,不进行客户端身份验证

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SSL \
  --from-file=ca.crt=<my_caroot.pem_file_path> \
  --from-literal=user.skip=true

使用 SSL 进行身份验证和加密

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SSL \
  --from-file=ca.crt=<my_caroot.pem_file_path> \
  --from-file=user.crt=<my_cert.pem_file_path> \
  --from-file=user.key=<my_key.pem_file_path>

注意

ca.crt 可以省略以回退到使用系统的根 CA 集。

自带主题

默认情况下,Knative Kafka Broker 会创建它自己的内部主题,但可以使用 kafka.eventing.knative.dev/external.topic 注释指向外部管理的主题

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    # case-sensitive
    eventing.knative.dev/broker.class: Kafka
    kafka.eventing.knative.dev/external.topic: <my-topic-name>
  name: default
  namespace: default
spec:
  # other spec fields ...

注意

使用外部主题时,Knative Kafka Broker 不拥有该主题,也不负责管理该主题。这包括主题生命周期或其一般有效性。其他对主题的常规访问限制可能适用。有关使用 访问控制列表 (ACL) 的文档,请参阅。

消费者偏移提交间隔

Kafka 消费者通过提交偏移量来跟踪最后成功发送的事件。

Knative Kafka Broker 每 auto.commit.interval.ms 毫秒提交一次偏移量。

注意

为了防止对性能产生负面影响,不建议每次成功将事件发送到订阅者时都提交偏移量。

可以通过修改 knative-eventing 命名空间中的 config-kafka-broker-data-plane ConfigMap 来更改间隔,方法是修改参数 auto.commit.interval.ms,如下所示

apiVersion: v1
kind: ConfigMap
metadata:
  name: config-kafka-broker-data-plane
  namespace: knative-eventing
data:
  # Some configurations omitted ...
  config-kafka-broker-consumer.properties: |
    # Some configurations omitted ...

    # Commit the offset every 5000 millisecods (5 seconds)
    auto.commit.interval.ms=5000

注意

Knative Kafka Broker 保证至少一次传递,这意味着您的应用程序可能会收到重复事件。更高的提交间隔意味着收到重复事件的可能性更高,因为当消费者重新启动时,它会从最后提交的偏移量重新启动。

Kafka 生产者和消费者配置

Knative 公开所有可用的 Kafka 生产者和消费者配置,可以修改这些配置以适合您的工作负载。

您可以通过修改 knative-eventing 命名空间中的 config-kafka-broker-data-plane ConfigMap 来更改这些配置。

有关此 ConfigMap 中可用设置的文档,请访问 Apache Kafka 网站,特别是 生产者配置消费者配置

为数据平面组件启用调试日志记录

以下 YAML 显示了数据平面组件的默认日志记录配置,该配置在安装步骤期间创建

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-config-logging
  namespace: knative-eventing
data:
  config.xml: |
    <configuration>
      <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
      </appender>
      <root level="INFO">
        <appender-ref ref="jsonConsoleAppender"/>
      </root>
    </configuration>

要将日志级别更改为 DEBUG,您必须

  1. 应用以下 kafka-config-logging ConfigMap 或将 ConfigMap kafka-config-logging 中的 level="INFO" 替换为 level="DEBUG"

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: kafka-config-logging
      namespace: knative-eventing
    data:
      config.xml: |
        <configuration>
          <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
            <encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
          </appender>
          <root level="DEBUG">
            <appender-ref ref="jsonConsoleAppender"/>
          </root>
        </configuration>
    
  2. 通过输入以下命令重新启动 kafka-broker-receiverkafka-broker-dispatcher

    kubectl rollout restart deployment -n knative-eventing kafka-broker-receiver
    kubectl rollout restart deployment -n knative-eventing kafka-broker-dispatcher
    

配置传递事件的顺序

在调度事件时,可以配置 Kafka Broker 以支持不同的传递排序保证。

您可以使用 Trigger 对象上的 kafka.eventing.knative.dev/delivery.order 注释来配置事件的传递顺序

apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
  name: my-service-trigger
  annotations:
     kafka.eventing.knative.dev/delivery.order: ordered
spec:
  broker: my-kafka-broker
  subscriber:
    ref:
      apiVersion: serving.knative.dev/v1
      kind: Service
      name: my-service

支持的消费者传递保证为

  • unordered:无序消费者是非阻塞消费者,它以无序方式传递消息,同时保留正确的偏移量管理。当有大量并行消费需求且不需要显式排序时,很有用。一个例子可能是处理点击分析。
  • ordered:有序消费者是一个按分区阻塞的消费者,它会在从 CloudEvent 订阅者那里收到成功响应后,才会传递分区的下一条消息。当需要更严格的排序,或事件之间存在关系或分组时,这很有用。例如,处理客户订单。

unordered 传递是默认的排序保证。

数据平面隔离与共享数据平面

Knative Kafka Broker 实现包含 2 个平面:控制平面和数据平面。控制平面由控制器组成,这些控制器与 Kubernetes API 交谈,监视自定义对象并管理数据平面。

数据平面是监听传入事件、与 Apache Kafka 交谈以及将事件发送到事件接收器的组件集合。这是事件流经的地方。Knative Kafka Broker 数据平面由 kafka-broker-receiverkafka-broker-dispatcher 部署组成。

使用 Broker 类 Kafka 时,Knative Kafka Broker 使用共享数据平面。这意味着,knative-eventing 命名空间中的 kafka-broker-receiverkafka-broker-dispatcher 部署用于集群中的所有 Kafka Broker。

但是,当将 KafkaNamespaced 设置为 Broker 类时,Kafka Broker 控制器会为每个存在 Broker 的命名空间创建一个新的数据平面。这个数据平面被该命名空间中的所有 KafkaNamespaced Broker 使用。

这提供了数据平面之间的隔离,这意味着用户命名空间中的 kafka-broker-receiverkafka-broker-dispatcher 部署仅用于该命名空间中的 Broker。

注意

由于存在独立的数据平面,因此此安全功能会创建更多部署并使用更多资源。除非您有这种隔离要求,否则建议使用 Kafka 类的常规 Broker。

要创建 KafkaNamespaced Broker,您必须将 eventing.knative.dev/broker.class 注释设置为 KafkaNamespaced

apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
  annotations:
    # case-sensitive
    eventing.knative.dev/broker.class: KafkaNamespaced
  name: default
  namespace: my-namespace
spec:
  config:
     # the referenced `configmap` must be in the same namespace with the `Broker` object, in this case `my-namespace`
    apiVersion: v1
    kind: ConfigMap
    name: my-config
    # namespace: my-namespace # no need to define, defaults to Broker's namespace

注意

spec.config 中指定的 configmap **必须** 与 Broker 对象位于同一个命名空间。

apiVersion: v1
kind: ConfigMap
metadata:
  name: my-config
  namespace: my-namespace
data:
  ...

创建第一个使用 KafkaNamespaced 类的 Broker 时,kafka-broker-receiverkafka-broker-dispatcher 部署将在命名空间中创建。之后,同一命名空间中所有使用 KafkaNamespaced 类的 Broker 都将使用相同的数据平面。当命名空间中没有使用 KafkaNamespaced 类的 Broker 时,命名空间中的数据平面将被删除。

配置 KafkaNamespaced Broker

可用于 Kafka Broker 类的所有配置机制,也适用于使用 KafkaNamespaced 类的 Broker,但有以下例外情况。

  • 此页面 描述了如何通过修改 knative-eventing 命名空间中的 config-kafka-broker-data-plane configmap 来完成生产者和消费者的配置。由于 Kafka Broker 控制器将此 configmap 传播到用户命名空间,因此目前无法按命名空间配置生产者和消费者配置。在 knative-eventing 命名空间中的 config-kafka-broker-data-plane ConfigMap 中设置的任何值,也将用于用户命名空间。
  • 由于相同的传播,也无法按命名空间配置消费者偏移量提交间隔。
  • 还有一些 configmap 被传播:config-tracingkafka-config-logging。这意味着跟踪和日志记录也不能按命名空间进行配置。
  • 同样,数据平面部署也会从 knative-eventing 命名空间传播到用户命名空间。这意味着数据平面部署无法按命名空间进行配置,并且将与 knative-eventing 命名空间中的部署相同。

使用 KEDA 启用和配置触发器的自动伸缩

要启用和配置引用使用 KEDA 的 Kafka Broker 的触发器的自动伸缩,请按照 此处说明 操作。

其他信息

我们使用分析和 Cookie 来了解网站流量。有关您使用我们网站的信息将与 Google 共享,用于此目的。 了解更多。