适用于 Apache Kafka 的 Knative Broker¶
Knative Broker for Apache Kafka 是 Knative Broker API 的实现,原生面向 Apache Kafka,以减少网络跳数,并为 Broker 和 Trigger API 模型提供与 Apache Kafka 更好的集成。
主要功能包括
- 控制平面高可用性
- 水平可扩展数据平面
- 可广泛配置
- 基于 CloudEvents 分区扩展 的有序事件传递
- 支持任何 Kafka 版本,请参阅 兼容性矩阵
- 支持 2 种 数据平面模式:每个命名空间的数据平面隔离或共享数据平面
Knative Kafka Broker 使用 二进制内容模式 将传入的 CloudEvents 存储为 Kafka 记录,因为它通过传输或路由优化效率更高,并且避免了 JSON 解析。使用 二进制内容模式 意味着所有 CloudEvent 属性和扩展都映射为 Kafka 记录上的头,而 CloudEvent 的 数据 对应于 Kafka 记录的实际值。这是使用 二进制内容模式 优于 结构化内容模式 的另一个好处,因为它更不“阻碍”,因此与不理解 CloudEvents 的系统兼容。
先决条件¶
这些说明假设您的集群管理员已经安装了 Knative Kafka 代理。
创建 Kafka 代理¶
Kafka 代理对象如下所示
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
# case-sensitive
eventing.knative.dev/broker.class: Kafka
# Optional annotation to point to an externally managed kafka topic:
# kafka.eventing.knative.dev/external.topic: <topic-name>
name: default
namespace: default
spec:
# Configuration specific to this broker.
config:
apiVersion: v1
kind: ConfigMap
name: kafka-broker-config
namespace: knative-eventing
# Optional dead letter sink, you can specify either:
# - deadLetterSink.ref, which is a reference to a Callable
# - deadLetterSink.uri, which is an absolute URI to a Callable (It can potentially be out of the Kubernetes cluster)
delivery:
deadLetterSink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: dlq-service
配置 Kafka 代理¶
spec.config 应引用任何 namespace 中的任何 ConfigMap,其外观如下所示
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: your-namespace
data:
# Number of topic partitions
default.topic.partitions: "10"
# Replication factor of topic messages.
default.topic.replication.factor: "3"
# A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"
此 ConfigMap 安装在集群的 Knative Eventing SYSTEM_NAMESPACE 中。您可以根据需要编辑全局配置。您也可以通过在 Kafka 代理的 spec.config 字段中引用不同 namespace 或具有不同 name 的不同 ConfigMap 来逐个代理覆盖这些设置。
注意
default.topic.replication.factor 的值必须小于或等于集群中 Kafka 代理实例的数量。例如,如果您只有一个 Kafka 代理,default.topic.replication.factor 的值不应超过 1。
Knative 支持您的 Kafka 版本支持的全套主题配置选项。要设置其中任何一个,您需要在 configmap 中添加一个带有 default.topic.config. 前缀的键。例如,要设置 retention.ms 值,您需要修改 ConfigMap 如下所示
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
# Number of topic partitions
default.topic.partitions: "10"
# Replication factor of topic messages.
default.topic.replication.factor: "3"
# A comma separated list of bootstrap servers. (It can be in or out the k8s cluster)
bootstrap.servers: "my-cluster-kafka-bootstrap.kafka:9092"
# Here is our retention.ms config
default.topic.config.retention.ms: "3600"
安全¶
Apache Kafka 支持不同的安全特性,Knative 支持以下特性
要启用安全特性,在 broker.spec.config 引用的 ConfigMap 中,我们可以引用一个 Secret
apiVersion: v1
kind: ConfigMap
metadata:
name: kafka-broker-config
namespace: knative-eventing
data:
# Other configurations
# ...
# Reference a Secret called my_secret
auth.secret.ref.name: my_secret
Secret my_secret 必须与 broker.spec.config 引用的 ConfigMap 位于同一命名空间中,在本例中为:knative-eventing。
注意
证书和密钥必须是PEM 格式。
使用 SASL 进行身份验证¶
Knative 支持以下 SASL 机制
PLAINSCRAM-SHA-256SCRAM-SHA-512- AWS MSK IAM 的
OAUTHBEARER
要使用特定的 SASL 机制,请将 <sasl_mechanism> 替换为您选择的机制。
使用 SASL 身份验证,不加密¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_PLAINTEXT \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-literal=user=<my_user> \
--from-literal=password=<my_password>
使用 SASL 身份验证并使用 SSL 加密¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_SSL \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-file=ca.crt=caroot.pem \
--from-literal=user=<my_user> \
--from-literal=password=<my_password>
使用 SSL 加密,无需客户端身份验证¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
--from-literal=user.skip=true
注意
可以省略 ca.crt 以回退到使用系统的根 CA 集。
使用 SSL 进行身份验证和加密¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
--from-file=user.crt=<my_cert.pem_file_path> \
--from-file=user.key=<my_key.pem_file_path>
AWS MSK IAM 身份验证¶
AWS MSK IAM 身份验证需要创建密钥和 Java 属性配置。
在以下 ConfigMap 中,将以下内容追加到列出的属性值中。如果使用假定的 IAM 角色,请将 awsRoleArn="<role_arn>" 添加到 sasl.jaas.config 的值中。
- config-kafka-broker-data-plane
- config-kafka-broker-producer.properties
- config-kafka-broker-consumer.properties
- config-kafka-channel-data-plane
- config-kafka-channel-producer.properties
- config-kafka-channel-consumer.properties
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required awsStsRegion="<region>";
sasl.login.callback.handler.class=software.amazon.msk.auth.iam.IAMOAuthBearerLoginCallbackHandler
sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMOAuthBearerLoginCallbackHandler
创建密钥以使用默认 AWS 凭证
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_SSL \
--from-literal=sasl.mechanism=OAUTHBEARER \
--from-literal=type=OAUTHBEARER \
--from-literal=tokenProvider=MSKAccessTokenProvider
或者创建密钥以使用假定角色
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_SSL \
--from-literal=sasl.mechanism=OAUTHBEARER \
--from-literal=type=OAUTHBEARER \
--from-literal=tokenProvider=MSKRoleAccessTokenProvider \
--from-literal=roleARN=<role_arn>
自带主题¶
默认情况下,Knative Kafka 代理会创建自己的内部主题,但可以使用 kafka.eventing.knative.dev/external.topic 注释指向外部管理的主题
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
# case-sensitive
eventing.knative.dev/broker.class: Kafka
kafka.eventing.knative.dev/external.topic: <my-topic-name>
name: default
namespace: default
spec:
# other spec fields ...
注意
使用外部主题时,Knative Kafka 代理不拥有该主题,也不负责管理该主题。这包括主题的生命周期或其总体有效性。可能对主题的一般访问施加其他限制。请参阅有关使用访问控制列表 (ACL) 的文档。
配置事件的投递顺序¶
在分派事件时,可以配置 Kafka 代理以支持不同的投递顺序保证。
您可以使用 Trigger 对象上的 kafka.eventing.knative.dev/delivery.order 注释来配置事件的投递顺序
apiVersion: eventing.knative.dev/v1
kind: Trigger
metadata:
name: my-service-trigger
annotations:
kafka.eventing.knative.dev/delivery.order: ordered
spec:
broker: my-kafka-broker
subscriber:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: my-service
支持的消费者投递保证是
unordered:无序使用者是一个非阻塞使用者,它无序地投递消息,同时保留适当的偏移量管理。当对并行消费有很高的需求且不需要显式排序时很有用。一个例子可能是处理点击分析。ordered:有序使用者是一个每个分区的阻塞使用者,它在向 CloudEvent 订阅者成功发送响应之前,会等待传递分区的下一条消息。当需要更严格的排序或事件之间存在关系或分组时很有用。一个例子可能是处理客户订单。
unordered 投递是默认的排序保证。
数据平面隔离与共享数据平面¶
Knative Kafka 代理实现有两个平面:控制平面和数据平面。控制平面由与 Kubernetes API 通信、监视自定义对象和管理数据平面的控制器组成。
数据平面是侦听传入事件、与 Apache Kafka 通信以及向事件接收器发送事件的组件集合。这是事件流经的地方。Knative Kafka 代理数据平面由 kafka-broker-receiver 和 kafka-broker-dispatcher 部署组成。
当使用 Broker 类 Kafka 时,Knative Kafka 代理使用共享数据平面。这意味着 knative-eventing 命名空间中的 kafka-broker-receiver 和 kafka-broker-dispatcher 部署用于集群中的所有 Kafka 代理。
然而,当 KafkaNamespaced 设置为 Broker 类时,Kafka 代理控制器会为存在代理的每个命名空间创建一个新的数据平面。此数据平面用于该命名空间中的所有 KafkaNamespaced 代理。
这在数据平面之间提供了隔离,这意味着用户命名空间中的 kafka-broker-receiver 和 kafka-broker-dispatcher 部署仅用于该命名空间中的代理。
注意
作为单独数据平面的结果,此安全特性会创建更多的部署并使用更多的资源。除非您有此类隔离要求,否则建议使用类为 Kafka 的*常规*代理。
要创建 KafkaNamespaced 代理,您必须将 eventing.knative.dev/broker.class 注释设置为 KafkaNamespaced
apiVersion: eventing.knative.dev/v1
kind: Broker
metadata:
annotations:
# case-sensitive
eventing.knative.dev/broker.class: KafkaNamespaced
name: default
namespace: my-namespace
spec:
config:
# the referenced `configmap` must be in the same namespace with the `Broker` object, in this case `my-namespace`
apiVersion: v1
kind: ConfigMap
name: my-config
# namespace: my-namespace # no need to define, defaults to Broker's namespace
注意
spec.config 中指定的 configmap 必须与 Broker 对象的命名空间相同
apiVersion: v1
kind: ConfigMap
metadata:
name: my-config
namespace: my-namespace
data:
...
在创建第一个具有 KafkaNamespaced 类的 Broker 时,kafka-broker-receiver 和 kafka-broker-dispatcher 部署将在命名空间中创建。之后,同一命名空间中所有具有 KafkaNamespaced 类的代理都使用相同的数据平面。当命名空间中没有 KafkaNamespaced 类的代理时,命名空间中的数据平面将被删除。
配置 KafkaNamespaced 代理¶
Kafka 代理类可用的所有配置机制也可用于 KafkaNamespaced 类的代理,但有以下例外
- 此页面描述了如何通过修改
knative-eventing命名空间中的config-kafka-broker-data-planeconfigmap 来配置生产者和消费者配置。由于 Kafka 代理控制器将此 configmap 传播到用户命名空间,因此目前无法按命名空间配置生产者和消费者配置。在knative-eventing命名空间的ConfigMap中设置的任何值也将用于用户命名空间中。 - 由于相同的传播,也不可能按命名空间配置消费者偏移量提交间隔。
- 还有一些 configmap 被传播:
config-tracing和kafka-config-logging。这意味着跟踪和日志记录也不能按命名空间配置。 - 同样,数据平面部署从
knative-eventing命名空间传播到用户命名空间。这意味着数据平面部署不能按命名空间配置,并且将与knative-eventing命名空间中的部署相同。
使用 KEDA 启用和配置触发器自动扩缩¶
要启用并配置引用使用 KEDA 的 Kafka 代理的触发器的自动伸缩,请遵循此处说明。
附加信息¶
- 要报告错误或请求功能,请在 eventing-kafka-broker 存储库 中创建问题。