适用于 Apache Kafka 的 Knative Sink¶
KafkaSink 是一个 Apache Kafka 原生 Sink 实现,将传入的 CloudEvent 持久化到可配置的 Apache Kafka Topic。本页面展示了如何安装和配置 Knative KafkaSink。
先决条件¶
您必须有权访问已安装 Knative Eventing 的 Kubernetes 集群。
安装¶
-
安装 Kafka 控制器
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.20.0/eventing-kafka-controller.yaml -
安装 KafkaSink 数据平面
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.20.0/eventing-kafka-sink.yaml -
验证
kafka-controller和kafka-sink-receiver部署正在运行kubectl get deployments.apps -n knative-eventing示例输出
NAME READY UP-TO-DATE AVAILABLE AGE eventing-controller 1/1 1 1 10s eventing-webhook 1/1 1 1 9s kafka-controller 1/1 1 1 3s kafka-sink-receiver 1/1 1 1 5s
KafkaSink 示例¶
KafkaSink 对象看起来类似于以下内容
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: my-kafka-sink
namespace: default
spec:
topic: mytopic
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
输出主题内容模式¶
CloudEvent 规范定义了 2 种传输 CloudEvent 的模式:结构化和二进制。
“结构化模式消息”是指事件使用独立事件格式完全编码并存储在消息体中的消息。
结构化内容模式将事件元数据和数据一起保留在有效负载中,从而允许同一事件在多个路由跳和多个协议之间简单转发。
“二进制模式消息”是指事件数据存储在消息体中,事件属性存储为消息元数据一部分的消息。
二进制内容模式适应任何形状的事件数据,并允许高效传输,无需转码工作。
具有指定 contentMode 的 KafkaSink 对象看起来类似于以下内容
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: my-kafka-sink
namespace: default
spec:
topic: mytopic
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
# CloudEvent content mode of Kafka messages sent to the topic.
# Possible values:
# - structured
# - binary
#
# default: binary.
#
# CloudEvent spec references:
# - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md#message
# - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/bindings/kafka-protocol-binding.md#33-structured-content-mode
# - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/bindings/kafka-protocol-binding.md#32-binary-content-mode
contentMode: binary # or structured
安全¶
Knative 支持以下 Apache Kafka 安全功能
启用安全功能¶
要启用安全功能,在 KafkaSink 规范中,您可以引用一个 Secret
apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
name: my-kafka-sink
namespace: default
spec:
topic: mytopic
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
auth:
secret:
ref:
name: my_secret
注意
Secret my_secret 必须存在于 KafkaSink 所在的命名空间中。证书和密钥必须采用 PEM 格式。
使用 SASL 进行身份验证¶
Knative 支持以下 SASL 机制
PLAINSCRAM-SHA-256SCRAM-SHA-512
要使用特定的 SASL 机制,请将 <sasl_mechanism> 替换为您选择的机制。
使用 SASL 身份验证,不加密¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_PLAINTEXT \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-literal=user=<my_user> \
--from-literal=password=<my_password>
使用 SASL 身份验证并使用 SSL 加密¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SASL_SSL \
--from-literal=sasl.mechanism=<sasl_mechanism> \
--from-file=ca.crt=caroot.pem \
--from-literal=user=<my_user> \
--from-literal=password=<my_password>
使用 SSL 加密,无需客户端身份验证¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
--from-literal=user.skip=true
使用 SSL 进行身份验证和加密¶
kubectl create secret --namespace <namespace> generic <my_secret> \
--from-literal=protocol=SSL \
--from-file=ca.crt=<my_caroot.pem_file_path> \
--from-file=user.crt=<my_cert.pem_file_path> \
--from-file=user.key=<my_key.pem_file_path>
注意
可以省略 ca.crt 以启用回退并使用系统的根 CA 集。
Kafka 生产者配置¶
Kafka 生产者是负责向 Apache Kafka 集群发送事件的组件。您可以通过修改 knative-eventing 命名空间中的 config-kafka-sink-data-plane ConfigMap 来更改集群中 Kafka 生产者的配置。
此 ConfigMap 中可用设置的文档可在 Apache Kafka 网站上找到,特别是生产者配置。
启用数据平面组件的调试日志记录¶
要启用数据平面组件的调试日志记录,请在 kafka-config-logging ConfigMap 中将日志记录级别更改为 DEBUG。
-
将
kafka-config-loggingConfigMap 创建为包含以下内容的 YAML 文件apiVersion: v1 kind: ConfigMap metadata: name: kafka-config-logging namespace: knative-eventing data: config.xml: | <configuration> <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender"> <encoder class="net.logstash.logback.encoder.LogstashEncoder"/> </appender> <root level="DEBUG"> <appender-ref ref="jsonConsoleAppender"/> </root> </configuration> -
通过运行命令应用 YAML 文件
其中kubectl apply -f <filename>.yaml<filename>是您在上一步中创建的文件的名称。 -
重新启动
kafka-sink-receiverkubectl rollout restart deployment -n knative-eventing kafka-sink-receiver