用于 Apache Kafka 的 Knative Broker¶
用于 Apache Kafka 的 Knative Broker 是 Knative Broker API 的一个实现,它以原生方式针对 Apache Kafka,以减少网络跳跃,并为 Broker 和 Trigger API 模型提供与 Apache Kafka 的更好集成。
值得注意的功能是
- 控制平面高可用性
- 水平可扩展数据平面
- 可广泛配置
- 根据 CloudEvents 分区扩展 按顺序传递事件
- 支持任何 Kafka 版本,请参阅 兼容性矩阵
- 支持两种 数据平面模式:每个命名空间的数据平面隔离或共享数据平面
Knative Kafka Broker 将传入的 CloudEvents 存储为 Kafka 记录,使用 二进制内容模式,因为它由于其针对传输或路由的优化而更高效,并且避免了 JSON 解析。使用 二进制内容模式
意味着所有 CloudEvent 属性和扩展都映射为 Kafka 记录上的标头,而 CloudEvent 的 数据
对应于 Kafka 记录的实际值。这是使用 二进制内容模式
而不是 结构化内容模式
的另一个好处,因为它不那么阻塞,因此与不了解 CloudEvents 的系统兼容。
先决条件¶
- 您已安装 Knative Eventing。
- 您有权访问 Apache Kafka 集群。
提示
如果您需要设置 Kafka 集群,您可以按照 Strimzi 快速入门页面 上的说明进行操作。
安装¶
-
通过输入以下命令安装 Kafka 控制器
kubectl apply --filename https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-controller.yaml
-
通过输入以下命令安装 Kafka Broker 数据平面
kubectl apply --filename https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.16.0/eventing-kafka-broker.yaml
-
通过输入以下命令验证
kafka-controller
、kafka-broker-receiver
和kafka-broker-dispatcher
是否正在运行kubectl get deployments.apps -n knative-eventing
示例输出
NAME READY UP-TO-DATE AVAILABLE AGE eventing-controller 1/1 1 1 10s eventing-webhook 1/1 1 1 9s kafka-controller 1/1 1 1 3s kafka-broker-dispatcher 1/1 1 1 4s kafka-broker-receiver 1/1 1 1 5s
创建 Kafka Broker¶
Kafka Broker 对象如下所示
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
# case-sensitive
eventing.knative.dev/broker.class: Kafka
# Optional annotation to point to an externally managed kafka topic:
# kafka.eventing.knative.dev/external.topic: <topic-name>
name: default
namespace: default
spec:
# Configuration specific to this broker.
config:
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
# Optional dead letter sink, you can specify either:
# - deadLetterSink.ref, which is a reference to a Callable
# - deadLetterSink.uri, which is an absolute URI to a Callable (It can potentially be out of the Kubernetes cluster)
delivery:
deadLetterSink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: dlq-service
配置 Kafka Broker¶
spec.config
应该引用任何 namespace
中的任何类似于以下内容的 ConfigMap
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
# Number of topic partitions
default.topic.partitions: "10"
# Replication factor of topic messages.
default.topic.replication.factor: "3"
# A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"
此 ConfigMap
安装在集群中的 Knative Eventing SYSTEM_NAMESPACE
中。您可以根据自己的需要编辑全局配置。您还可以通过在不同的 namespace
中引用不同的 ConfigMap
或在 Kafka Broker 的 spec.config
字段上使用不同的 name
来在每个代理的基础上覆盖这些设置。
注意
default.topic.replication.factor
值必须小于或等于集群中 Kafka 代理实例的数量。例如,如果您只有一个 Kafka 代理,则 default.topic.replication.factor
值不应超过 1
。
Knative 支持 Kafka 版本支持的完整主题配置选项集。要设置任何这些选项,您需要使用 default.topic.config.
前缀向 configmap 添加一个键。例如,要设置 retention.ms
值,您需要修改 ConfigMap
,使其类似于以下内容
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
# Number of topic partitions
default.topic.partitions: "10"
# Replication factor of topic messages.
default.topic.replication.factor: "3"
# A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"
# Here is our retention.ms config
default.topic.config.retention.ms: "3600"
设置为默认代理实现¶
要将 Kafka Broker 设置为 Knative 部署中所有代理的默认实现,您可以通过修改 knative-eventing
命名空间中的 config-br-defaults
ConfigMap 来应用全局设置。
这使您可以避免为每个代理配置单个或每个命名空间的设置,例如 metadata.annotations.eventing.knative.dev/broker.class
或 spec.config
。
以下 YAML 是一个使用 Kafka Broker 作为默认实现的 config-br-defaults
ConfigMap 的示例。
apiVersion: v1
kind: ConfigMap
metadata:
name: config-br-defaults
namespace: knative-eventing
data:
default-br-config: |
clusterDefault:
brokerClass: Kafka
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
namespaceDefaults:
namespace1:
brokerClass: Kafka
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
namespace2:
brokerClass: Kafka
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
安全¶
Apache Kafka 支持不同的安全功能,Knative 支持以下功能
要启用安全功能,在 broker.spec.config
引用的 ConfigMap
中,我们可以引用一个 Secret
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
# Other configurations
# ...
# Reference a Secret called my_secret
auth.secret.ref.name: my_secret
Secret
my_secret
必须存在于 ConfigMap
引用的同一命名空间中,在本例中为:knative-eventing
。
注意
证书和密钥必须采用 PEM
格式。
使用 SASL 进行身份验证¶
Knative 支持以下 SASL 机制
PLAIN
SCRAM-SHA-256
SCRAM-SHA-512
要使用特定的 SASL 机制,请将 <sasl_mechanism>
替换为您选择的机制。
使用 SASL 进行身份验证,不加密¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_PLAINTEXT \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-literal=user=<my_user> \
--from-literal=password=<my_password>
使用 SASL 进行身份验证,并使用 SSL 进行加密¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_SSL \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-file=ca.crt=caroot.pem \
--from-literal=user=<my_user> \
--from-literal=password=<my_password>
使用 SSL 进行加密,不进行客户端身份验证¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
--from-literal=user.skip=true
使用 SSL 进行身份验证和加密¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
--from-file=user.crt=<my_cert.pem_file_path> \
--from-file=user.key=<my_key.pem_file_path>
注意
ca.crt
可以省略以回退到使用系统的根 CA 集。
自带主题¶
默认情况下,Knative Kafka Broker 会创建它自己的内部主题,但可以使用 kafka.eventing.knative.dev/external.topic
注释指向外部管理的主题
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
# case-sensitive
eventing.knative.dev/broker.class: Kafka
kafka.eventing.knative.dev/external.topic: <my-topic-name>
name: default
namespace: default
spec:
# other spec fields ...
注意
使用外部主题时,Knative Kafka Broker 不拥有该主题,也不负责管理该主题。这包括主题生命周期或其一般有效性。其他对主题的常规访问限制可能适用。有关使用 访问控制列表 (ACL) 的文档,请参阅。
消费者偏移提交间隔¶
Kafka 消费者通过提交偏移量来跟踪最后成功发送的事件。
Knative Kafka Broker 每 auto.commit.interval.ms
毫秒提交一次偏移量。
注意
为了防止对性能产生负面影响,不建议每次成功将事件发送到订阅者时都提交偏移量。
可以通过修改 knative-eventing
命名空间中的 config-kafka-broker-data-plane
ConfigMap
来更改间隔,方法是修改参数 auto.commit.interval.ms
,如下所示
apiVersion: v1
kind: ConfigMap
metadata:
name: config-kafka-broker-data-plane
namespace: knative-eventing
data:
# Some configurations omitted ...
config-kafka-broker-consumer.properties: |
# Some configurations omitted ...
# Commit the offset every 5000 millisecods (5 seconds)
auto.commit.interval.ms=5000
注意
Knative Kafka Broker 保证至少一次传递,这意味着您的应用程序可能会收到重复事件。更高的提交间隔意味着收到重复事件的可能性更高,因为当消费者重新启动时,它会从最后提交的偏移量重新启动。
Kafka 生产者和消费者配置¶
Knative 公开所有可用的 Kafka 生产者和消费者配置,可以修改这些配置以适合您的工作负载。
您可以通过修改 knative-eventing
命名空间中的 config-kafka-broker-data-plane
ConfigMap
来更改这些配置。
有关此 ConfigMap
中可用设置的文档,请访问 Apache Kafka 网站,特别是 生产者配置 和 消费者配置。
为数据平面组件启用调试日志记录¶
以下 YAML 显示了数据平面组件的默认日志记录配置,该配置在安装步骤期间创建
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-config-logging
namespace: knative-eventing
data:
config.xml: |
<configuration>
<appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<root level="INFO">
<appender-ref ref="jsonConsoleAppender"/>
</root>
</configuration>
要将日志级别更改为 DEBUG
,您必须
-
应用以下
kafka-config-logging
ConfigMap
或将ConfigMap
kafka-config-logging
中的level="INFO"
替换为level="DEBUG"
apiVersion: v1 kind: ConfigMap metadata: name: kafka-config-logging namespace: knative-eventing data: config.xml: | <configuration> <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="net.logstash.logback.encoder.LogstashEncoder"/> </appender> <root level="DEBUG"> <appender-ref ref="jsonConsoleAppender"/> </root> </configuration>
-
通过输入以下命令重新启动
kafka-broker-receiver
和kafka-broker-dispatcher
kubectl rollout restart deployment -n knative-eventing kafka-broker-receiver kubectl rollout restart deployment -n knative-eventing kafka-broker-dispatcher
配置传递事件的顺序¶
在调度事件时,可以配置 Kafka Broker 以支持不同的传递排序保证。
您可以使用 Trigger
对象上的 kafka.eventing.knative.dev/delivery.order
注释来配置事件的传递顺序
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: my-service-trigger
annotations:
kafka.eventing.knative.dev/delivery.order: ordered
spec:
broker: my-kafka-broker
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: my-service
支持的消费者传递保证为
unordered
:无序消费者是非阻塞消费者,它以无序方式传递消息,同时保留正确的偏移量管理。当有大量并行消费需求且不需要显式排序时,很有用。一个例子可能是处理点击分析。ordered
:有序消费者是一个按分区阻塞的消费者,它会在从 CloudEvent 订阅者那里收到成功响应后,才会传递分区的下一条消息。当需要更严格的排序,或事件之间存在关系或分组时,这很有用。例如,处理客户订单。
unordered
传递是默认的排序保证。
数据平面隔离与共享数据平面¶
Knative Kafka Broker 实现包含 2 个平面:控制平面和数据平面。控制平面由控制器组成,这些控制器与 Kubernetes API 交谈,监视自定义对象并管理数据平面。
数据平面是监听传入事件、与 Apache Kafka 交谈以及将事件发送到事件接收器的组件集合。这是事件流经的地方。Knative Kafka Broker 数据平面由 kafka-broker-receiver
和 kafka-broker-dispatcher
部署组成。
使用 Broker 类 Kafka
时,Knative Kafka Broker 使用共享数据平面。这意味着,knative-eventing
命名空间中的 kafka-broker-receiver
和 kafka-broker-dispatcher
部署用于集群中的所有 Kafka Broker。
但是,当将 KafkaNamespaced
设置为 Broker 类时,Kafka Broker 控制器会为每个存在 Broker 的命名空间创建一个新的数据平面。这个数据平面被该命名空间中的所有 KafkaNamespaced
Broker 使用。
这提供了数据平面之间的隔离,这意味着用户命名空间中的 kafka-broker-receiver
和 kafka-broker-dispatcher
部署仅用于该命名空间中的 Broker。
注意
由于存在独立的数据平面,因此此安全功能会创建更多部署并使用更多资源。除非您有这种隔离要求,否则建议使用 Kafka
类的常规 Broker。
要创建 KafkaNamespaced
Broker,您必须将 eventing.knative.dev/broker.class
注释设置为 KafkaNamespaced
。
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
# case-sensitive
eventing.knative.dev/broker.class: KafkaNamespaced
name: default
namespace: my-namespace
spec:
config:
# the referenced `configmap` must be in the same namespace with the `Broker` object, in this case `my-namespace`
apiVersion: v1
kind: ConfigMap
name: my-config
# namespace: my-namespace # no need to define, defaults to Broker's namespace
注意
spec.config
中指定的 configmap
**必须** 与 Broker
对象位于同一个命名空间。
apiVersion: v1
kind: ConfigMap
metadata:
name: my-config
namespace: my-namespace
data:
...
创建第一个使用 KafkaNamespaced
类的 Broker
时,kafka-broker-receiver
和 kafka-broker-dispatcher
部署将在命名空间中创建。之后,同一命名空间中所有使用 KafkaNamespaced
类的 Broker 都将使用相同的数据平面。当命名空间中没有使用 KafkaNamespaced
类的 Broker 时,命名空间中的数据平面将被删除。
配置 KafkaNamespaced
Broker¶
可用于 Kafka
Broker 类的所有配置机制,也适用于使用 KafkaNamespaced
类的 Broker,但有以下例外情况。
- 此页面 描述了如何通过修改
knative-eventing
命名空间中的config-kafka-broker-data-plane
configmap 来完成生产者和消费者的配置。由于 Kafka Broker 控制器将此 configmap 传播到用户命名空间,因此目前无法按命名空间配置生产者和消费者配置。在knative-eventing
命名空间中的config-kafka-broker-data-plane
ConfigMap
中设置的任何值,也将用于用户命名空间。 - 由于相同的传播,也无法按命名空间配置消费者偏移量提交间隔。
- 还有一些 configmap 被传播:
config-tracing
和kafka-config-logging
。这意味着跟踪和日志记录也不能按命名空间进行配置。 - 同样,数据平面部署也会从
knative-eventing
命名空间传播到用户命名空间。这意味着数据平面部署无法按命名空间进行配置,并且将与knative-eventing
命名空间中的部署相同。
使用 KEDA 启用和配置触发器的自动伸缩¶
要启用和配置引用使用 KEDA 的 Kafka Broker 的触发器的自动伸缩,请按照 此处说明 操作。
其他信息¶
- 要报告错误或请求功能,请在 eventing-kafka-broker 存储库 中打开问题。