适用于 Apache Kafka 的 Knative Broker¶
Knative Broker for Apache Kafka 是 Knative Broker API 的实现,原生面向 Apache Kafka,以减少网络跳数,并为 Broker 和 Trigger API 模型提供与 Apache Kafka 更好的集成。
主要功能包括
- 控制平面高可用性
- 水平可扩展数据平面
- 可广泛配置
- 基于 CloudEvents 分区扩展 的有序事件传递
- 支持任何 Kafka 版本,请参阅 兼容性矩阵
- 支持 2 种 数据平面模式:每个命名空间的数据平面隔离或共享数据平面
Knative Kafka Broker 使用 二进制内容模式 将传入的 CloudEvents 存储为 Kafka 记录,因为它通过传输或路由优化效率更高,并且避免了 JSON 解析。使用 二进制内容模式 意味着所有 CloudEvent 属性和扩展都映射为 Kafka 记录上的头,而 CloudEvent 的 数据 对应于 Kafka 记录的实际值。这是使用 二进制内容模式 优于 结构化内容模式 的另一个好处,因为它更不“阻碍”,因此与不理解 CloudEvents 的系统兼容。
先决条件¶
- 您已经安装了 Knative Eventing。
- 您可以使用一个 Apache Kafka 集群。
提示
如果您需要设置 Kafka 集群,可以按照 Strimzi 快速入门页面 上的说明进行操作。
安装¶
-
通过输入以下命令安装 Kafka 控制器
kubectl apply --filename https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.21.0/eventing-kafka-controller.yaml -
通过输入以下命令安装 Kafka Broker 数据平面
kubectl apply --filename https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.21.0/eventing-kafka-broker.yaml -
通过输入以下命令验证
kafka-controller、kafka-broker-receiver和kafka-broker-dispatcher是否正在运行kubectl get deployments.apps -n knative-eventing示例输出
NAME READY UP-TO-DATE AVAILABLE AGE eventing-controller 1/1 1 1 10s eventing-webhook 1/1 1 1 9s kafka-controller 1/1 1 1 3s kafka-broker-dispatcher 1/1 1 1 4s kafka-broker-receiver 1/1 1 1 5s
消费者偏移量提交间隔¶
Kafka 消费者通过提交偏移量来跟踪最后成功发送的事件。
Knative Kafka Broker 每 auto.commit.interval.ms 毫秒提交一次偏移量。
注意
为了避免对性能产生负面影响,不建议在每次成功将事件发送给订阅者时都提交偏移量。
可以通过修改 knative-eventing 命名空间中的 config-kafka-broker-data-plane ConfigMap,并修改 auto.commit.interval.ms 参数来更改此间隔,如下所示
apiVersion: v1
kind: ConfigMap
metadata:
name: config-kafka-broker-data-plane
namespace: knative-eventing
data:
# Some configurations omitted ...
config-kafka-broker-consumer.properties: |
# Some configurations omitted ...
# Commit the offset every 5000 millisecods (5 seconds)
auto.commit.interval.ms=5000
注意
Knative Kafka Broker 保证至少一次传递,这意味着您的应用程序可能会收到重复的事件。较高的提交间隔意味着收到重复事件的概率较高,因为当 Consumer 重启时,它会从最后一次提交的偏移量开始。
Kafka Producer 和 Consumer 配置¶
Knative 公开了所有可用的 Kafka Producer 和 Consumer 配置,您可以根据您的工作负载进行修改。
您可以通过修改 knative-eventing 命名空间中的 config-kafka-broker-data-plane ConfigMap 来更改这些配置。
有关此 ConfigMap 中可用设置的文档,可以在 Apache Kafka 网站 上找到,特别是 Producer 配置 和 Consumer 配置。
启用数据平面组件的调试日志记录¶
以下 YAML 显示了数据平面组件的默认日志记录配置,该配置在安装步骤中创建
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-config-logging
namespace: knative-eventing
data:
config.xml: |
<configuration>
<appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<root level="INFO">
<appender-ref ref="jsonConsoleAppender"/>
</root>
</configuration>
要将日志级别更改为 DEBUG,您必须
-
应用以下
kafka-config-loggingConfigMap,或将ConfigMapkafka-config-logging中的level="INFO"替换为level="DEBUG"apiVersion: v1 kind: ConfigMap metadata: name: kafka-config-logging namespace: knative-eventing data: config.xml: | <configuration> <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="net.logstash.logback.encoder.LogstashEncoder"/> </appender> <root level="DEBUG"> <appender-ref ref="jsonConsoleAppender"/> </root> </configuration> -
通过输入以下命令重启
kafka-broker-receiver和kafka-broker-dispatcherkubectl rollout restart deployment -n knative-eventing kafka-broker-receiver kubectl rollout restart deployment -n knative-eventing kafka-broker-dispatcher
使用 KEDA 启用和配置触发器自动扩缩¶
要使用 KEDA 为引用 Kafka Brokers 的触发器启用和配置自动缩放,请遵循 此处的说明。
附加信息¶
- 要报告错误或请求功能,请在 eventing-kafka-broker 存储库 中创建问题。