跳到内容

适用于 Apache Kafka 的 Knative Broker

Knative Broker for Apache Kafka 是 Knative Broker API 的实现,原生面向 Apache Kafka,以减少网络跳数,并为 Broker 和 Trigger API 模型提供与 Apache Kafka 更好的集成。

主要功能包括

Knative Kafka Broker 使用 二进制内容模式 将传入的 CloudEvents 存储为 Kafka 记录,因为它通过传输或路由优化效率更高,并且避免了 JSON 解析。使用 二进制内容模式 意味着所有 CloudEvent 属性和扩展都映射为 Kafka 记录上的头,而 CloudEvent 的 数据 对应于 Kafka 记录的实际值。这是使用 二进制内容模式 优于 结构化内容模式 的另一个好处,因为它更不“阻碍”,因此与不理解 CloudEvents 的系统兼容。

先决条件

  1. 您已经安装了 Knative Eventing。
  2. 您可以使用一个 Apache Kafka 集群。

提示

如果您需要设置 Kafka 集群,可以按照 Strimzi 快速入门页面 上的说明进行操作。

安装

  1. 通过输入以下命令安装 Kafka 控制器

    kubectl apply --filename https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.21.0/eventing-kafka-controller.yaml
    
  2. 通过输入以下命令安装 Kafka Broker 数据平面

    kubectl apply --filename https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.21.0/eventing-kafka-broker.yaml
    
  3. 通过输入以下命令验证 kafka-controllerkafka-broker-receiverkafka-broker-dispatcher 是否正在运行

    kubectl get deployments.apps -n knative-eventing
    

    示例输出

    NAME                           READY   UP-TO-DATE   AVAILABLE   AGE
    eventing-controller            1/1     1            1           10s
    eventing-webhook               1/1     1            1           9s
    kafka-controller               1/1     1            1           3s
    kafka-broker-dispatcher        1/1     1            1           4s
    kafka-broker-receiver          1/1     1            1           5s
    

消费者偏移量提交间隔

Kafka 消费者通过提交偏移量来跟踪最后成功发送的事件。

Knative Kafka Broker 每 auto.commit.interval.ms 毫秒提交一次偏移量。

注意

为了避免对性能产生负面影响,不建议在每次成功将事件发送给订阅者时都提交偏移量。

可以通过修改 knative-eventing 命名空间中的 config-kafka-broker-data-plane ConfigMap,并修改 auto.commit.interval.ms 参数来更改此间隔,如下所示

apiVersion: v1
kind: ConfigMap
metadata:
  name: config-kafka-broker-data-plane
  namespace: knative-eventing
data:
  # Some configurations omitted ...
  config-kafka-broker-consumer.properties: |
    # Some configurations omitted ...

    # Commit the offset every 5000 millisecods (5 seconds)
    auto.commit.interval.ms=5000

注意

Knative Kafka Broker 保证至少一次传递,这意味着您的应用程序可能会收到重复的事件。较高的提交间隔意味着收到重复事件的概率较高,因为当 Consumer 重启时,它会从最后一次提交的偏移量开始。

Kafka Producer 和 Consumer 配置

Knative 公开了所有可用的 Kafka Producer 和 Consumer 配置,您可以根据您的工作负载进行修改。

您可以通过修改 knative-eventing 命名空间中的 config-kafka-broker-data-plane ConfigMap 来更改这些配置。

有关此 ConfigMap 中可用设置的文档,可以在 Apache Kafka 网站 上找到,特别是 Producer 配置Consumer 配置

启用数据平面组件的调试日志记录

以下 YAML 显示了数据平面组件的默认日志记录配置,该配置在安装步骤中创建

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-config-logging
  namespace: knative-eventing
data:
  config.xml: |
    <configuration>
      <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
        <encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
      </appender>
      <root level="INFO">
        <appender-ref ref="jsonConsoleAppender"/>
      </root>
    </configuration>

要将日志级别更改为 DEBUG,您必须

  1. 应用以下 kafka-config-logging ConfigMap,或将 ConfigMap kafka-config-logging 中的 level="INFO" 替换为 level="DEBUG"

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: kafka-config-logging
      namespace: knative-eventing
    data:
      config.xml: |
        <configuration>
          <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
            <encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
          </appender>
          <root level="DEBUG">
            <appender-ref ref="jsonConsoleAppender"/>
          </root>
        </configuration>
    
  2. 通过输入以下命令重启 kafka-broker-receiverkafka-broker-dispatcher

    kubectl rollout restart deployment -n knative-eventing kafka-broker-receiver
    kubectl rollout restart deployment -n knative-eventing kafka-broker-dispatcher
    

使用 KEDA 启用和配置触发器自动扩缩

要使用 KEDA 为引用 Kafka Brokers 的触发器启用和配置自动缩放,请遵循 此处的说明

附加信息

我们使用分析和 cookie 来了解网站流量。有关您使用我们网站的信息会与 Google 共享以达到此目的。了解更多。