Apache Kafka 的 Knative 源¶
KafkaSource 从现有的 Apache Kafka 主题中读取消息,并通过 HTTP 将这些消息作为 CloudEvents 发送到其配置的 sink。KafkaSource 会保留存储在主题分区中的消息的顺序。它是通过等待 sink 的成功响应,然后才投递同一分区中的下一条消息来实现的。
安装 KafkaSource 控制器¶
-
通过输入以下命令安装
KafkaSource控制器kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.21.0/eventing-kafka-controller.yaml -
通过输入以下命令安装 Kafka 源数据平面
kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.21.0/eventing-kafka-source.yaml -
通过输入以下命令验证
kafka-controller和kafka-source-dispatcher是否正在运行kubectl get deployments.apps,statefulsets.apps -n knative-eventing示例输出
NAME READY UP-TO-DATE AVAILABLE AGE deployment.apps/kafka-controller 1/1 1 1 3s NAME READY AGE statefulset.apps/kafka-source-dispatcher 1/1 3s
可选:创建一个 Kafka 主题¶
注意
创建 Kafka 主题部分假设您正在使用 Strimzi 来操作 Apache Kafka,但等效的操作可以使用 Apache Kafka CLI 或任何其他工具来复制。
如果您正在使用 Strimzi
-
创建
KafkaTopicYAML 文件apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic metadata: name: knative-demo-topic namespace: kafka labels: strimzi.io/cluster: my-cluster spec: partitions: 3 replicas: 1 config: retention.ms: 7200000 segment.bytes: 1073741824 -
通过运行命令部署
KafkaTopicYAML 文件其中kubectl apply -f <filename>.yaml<filename>是您的KafkaTopicYAML 文件的名称。示例输出
kafkatopic.kafka.strimzi.io/knative-demo-topic created -
通过运行命令确保
KafkaTopic正在运行kubectl -n kafka get kafkatopics.kafka.strimzi.io示例输出
NAME CLUSTER PARTITIONS REPLICATION FACTOR knative-demo-topic my-cluster 3 1
创建服务¶
-
将
event-display服务创建为 YAML 文件apiVersion: serving.knative.dev/v1 kind: Service metadata: name: event-display namespace: default spec: template: spec: containers: - # This corresponds to # https://github.com/knative/eventing/tree/main/cmd/event_display/main.go image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display -
通过运行命令应用 YAML 文件
其中kubectl apply -f <filename>.yaml<filename>是您在上一步中创建的文件的名称。示例输出
service.serving.knative.dev/event-display created -
通过运行以下命令,确保服务 Pod 正在运行
kubectl get podsPod 名称以
event-display为前缀NAME READY STATUS RESTARTS AGE event-display-00001-deployment-5d5df6c7-gv2j4 2/2 Running 0 72s
Kafka 事件源¶
-
相应地修改
source/event-source.yaml文件,包括引导服务器、主题等apiVersion: sources.knative.dev/v1 kind: KafkaSource metadata: name: kafka-source spec: consumerGroup: knative-group bootstrapServers: - my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace topics: - knative-demo-topic sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display -
部署事件源
kubectl apply -f event-source.yaml示例输出
kafkasource.sources.knative.dev/kafka-source created -
验证 KafkaSource 是否已准备就绪
kubectl get kafkasource kafka-source示例输出
NAME TOPICS BOOTSTRAPSERVERS READY REASON AGE kafka-source ["knative-demo-topic"] ["my-cluster-kafka-bootstrap.kafka:9092"] True 26h
伸缩¶
要调度更多的或更少的消费者,可以对 KafkaSource 进行伸缩,并将它们分配给不同的 dispatcher pod。kafkasource 状态在 status.placements 键下显示此类分配。
您可以使用以下格式通过 kubectl 伸缩 KafkaSource
kubectl scale kafkasource -n <ns> <kafkasource-name> --replicas=<number-of-replicas> # e.g. 12 replicas for a topic with 12 partitions
或者,如果您使用的是 GitOps 方法,可以在您的存储库中添加 consumers 键,如下面的示例所示,并提交它
apiVersion: sources.knative.dev/v1
kind: KafkaSource
metadata:
name: kafka-source
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092
consumers: 12 # Number of replicas
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
使用 KEDA 自动伸缩¶
您可以与 KEDA 一起自动伸缩 KafkaSource。有关如何启用和配置此功能的更多信息,请在此处阅读说明。
验证¶
-
向 Apache Kafka 主题(如以下示例所示)中产生一条消息 (
{"msg": "This is a test!"})kubectl -n kafka run kafka-producer -ti --image=strimzi/kafka:0.14.0-kafka-2.3.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic knative-demo-topic提示
如果您没有看到命令提示符,请尝试按 Enter。
-
验证 Service 是否已收到来自事件源的消息
kubectl logs --selector='serving.knative.dev/service=event-display' -c user-container示例输出
☁️ cloudevents.Event Validation: valid Context Attributes, specversion: 1.0 type: dev.knative.kafka.event source: /apis/v1/namespaces/default/kafkasources/kafka-source#my-topic subject: partition:0#564 id: partition:0/offset:564 time: 2020-02-10T18:10:23.861866615Z datacontenttype: application/json Extensions, key: Data, { "msg": "This is a test!" }
处理投递失败¶
KafkaSource 实现 Delivery 规范,允许您为其配置事件投递参数,这些参数在事件投递失败时应用
apiVersion: sources.knative.dev/v1
kind: KafkaSource
metadata:
name: kafka-source
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
delivery:
deadLetterSink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: example-sink
backoffDelay: <duration>
backoffPolicy: <policy-type>
retry: <integer>
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
delivery API 在 处理投递失败章节中讨论。
可选:指定键反序列化器¶
当 KafkaSource 从 Kafka 接收到消息时,它会将键转储到名为 Key 的事件扩展中,并将 Kafka 消息头转储到以 kafkaheader 开头的扩展中。
您可以在四种类型中指定键反序列化器
string(默认值)用于 UTF-8 编码的字符串int用于 32 位和 64 位有符号整数float用于 32 位和 64 位浮点数byte-array用于 Base64 编码的字节数组
要指定键反序列化器,请将标签 kafkasources.sources.knative.dev/key-type 添加到 KafkaSource 定义中,如下例所示
apiVersion: sources.knative.dev/v1
kind: KafkaSource
metadata:
name: kafka-source
labels:
kafkasources.sources.knative.dev/key-type: int
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
可选:指定初始偏移量¶
默认情况下,KafkaSource 从每个分区中的最新偏移量开始消费。如果要从最早的偏移量开始消费,请将 initialOffset 字段设置为 earliest,例如
apiVersion: sources.knative.dev/v1
kind: KafkaSource
metadata:
name: kafka-source
spec:
consumerGroup: knative-group
initialOffset: earliest
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
sink:
ref:
apiVersion: serving.knative.dev/v1
kind: Service
name: event-display
注意
initialOffset 的有效值为 earliest 和 latest。任何其他值都会导致验证错误。仅当该消费者组没有已提交的偏移量时,此字段才有效。
连接到启用了 TLS 的 Kafka 代理¶
KafkaSource 支持 TLS 和 SASL 认证方法。要启用 TLS 认证,您必须拥有以下文件
- CA 证书
- 客户端证书和密钥
KafkaSource 期望这些文件是 PEM 格式。如果它们是另一种格式,例如 JKS,请将它们转换为 PEM。
-
通过运行以下命令,在将设置 KafkaSource 的命名空间中创建证书文件作为 secret
kubectl create secret generic cacert --from-file=caroot.pemkubectl create secret tls kafka-secret --cert=certificate.pem --key=key.pem -
应用 KafkaSource。相应地修改
bootstrapServers和topics字段。apiVersion: sources.knative.dev/v1 kind: KafkaSource metadata: name: kafka-source-with-tls spec: net: tls: enable: true cert: secretKeyRef: key: tls.crt name: kafka-secret key: secretKeyRef: key: tls.key name: kafka-secret caCert: secretKeyRef: key: caroot.pem name: cacert consumerGroup: knative-group bootstrapServers: - my-secure-kafka-bootstrap.kafka:443 topics: - knative-demo-topic sink: ref: apiVersion: serving.knative.dev/v1 kind: Service name: event-display
为 KafkaSources 启用 SASL¶
简单身份验证和安全层 (SASL) 用于 Apache Kafka 的身份验证。如果您在集群上使用 SASL 身份验证,用户必须向 Knative 提供凭据才能与 Kafka 集群通信,否则事件无法被生产或消费。
先决条件¶
- 您可以访问启用了简单身份验证和安全层 (SASL) 的 Kafka 集群。
步骤¶
-
通过运行以下命令,创建使用 Kafka 集群的 SASL 信息的 secret
STRIMZI_CRT=$(kubectl -n kafka get secret example-cluster-cluster-ca-cert --template='{{index.data "ca.crt"}}' | base64 --decode )SASL_PASSWD=$(kubectl -n kafka get secret example-user --template='{{index.data "password"}}' | base64 --decode )kubectl create secret -n default generic <secret_name> \ --from-literal=ca.crt="$STRIMZI_CRT" \ --from-literal=password="$SASL_PASSWD" \ --from-literal=saslType="SCRAM-SHA-512" \ --from-literal=user="example-user" -
创建或修改 KafkaSource,使其包含以下 spec 选项
apiVersion: sources.knative.dev/v1 kind: KafkaSource metadata: name: example-source spec: ... net: sasl: enable: true user: secretKeyRef: name: <secret_name> key: user password: secretKeyRef: name: <secret_name> key: password type: secretKeyRef: name: <secret_name> key: saslType tls: enable: true caCert: secretKeyRef: name: <secret_name> key: ca.crt ...其中
<secret_name>是上一步中生成的 secret 的名称。
清理步骤¶
-
删除 Kafka 事件源
kubectl delete -f source/source.yaml kafkasource.sources.knative.dev示例输出
"kafka-source" deleted -
删除
event-display服务kubectl delete -f source/event-display.yaml service.serving.knative.dev示例输出
"event-display" deleted -
可选:删除 Apache Kafka 主题
kubectl delete -f kafka-topic.yaml示例输出
kafkatopic.kafka.strimzi.io "knative-demo-topic" deleted