跳到内容

Apache Kafka 的 Knative 源

stage version

KafkaSource 从现有的 Apache Kafka 主题中读取消息,并通过 HTTP 将这些消息作为 CloudEvents 发送到其配置的 sinkKafkaSource 会保留存储在主题分区中的消息的顺序。它是通过等待 sink 的成功响应,然后才投递同一分区中的下一条消息来实现的。

安装 KafkaSource 控制器

  1. 通过输入以下命令安装 KafkaSource 控制器

    kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.21.0/eventing-kafka-controller.yaml
    
  2. 通过输入以下命令安装 Kafka 源数据平面

    kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.21.0/eventing-kafka-source.yaml
    
  3. 通过输入以下命令验证 kafka-controllerkafka-source-dispatcher 是否正在运行

    kubectl get deployments.apps,statefulsets.apps -n knative-eventing
    

    示例输出

    NAME                                           READY   UP-TO-DATE   AVAILABLE   AGE
    deployment.apps/kafka-controller               1/1     1            1           3s
    
    NAME                                       READY   AGE
    statefulset.apps/kafka-source-dispatcher   1/1     3s
    

可选:创建一个 Kafka 主题

注意

创建 Kafka 主题部分假设您正在使用 Strimzi 来操作 Apache Kafka,但等效的操作可以使用 Apache Kafka CLI 或任何其他工具来复制。

如果您正在使用 Strimzi

  1. 创建 KafkaTopic YAML 文件

    apiVersion: kafka.strimzi.io/v1beta2
    kind: KafkaTopic
    metadata:
      name: knative-demo-topic
      namespace: kafka
      labels:
        strimzi.io/cluster: my-cluster
    spec:
      partitions: 3
      replicas: 1
      config:
        retention.ms: 7200000
        segment.bytes: 1073741824
    
  2. 通过运行命令部署 KafkaTopic YAML 文件

    kubectl apply -f <filename>.yaml
    
    其中 <filename> 是您的 KafkaTopic YAML 文件的名称。

    示例输出

    kafkatopic.kafka.strimzi.io/knative-demo-topic created
    

  3. 通过运行命令确保 KafkaTopic 正在运行

    kubectl -n kafka get kafkatopics.kafka.strimzi.io
    

    示例输出

    NAME                 CLUSTER      PARTITIONS   REPLICATION FACTOR
    knative-demo-topic   my-cluster   3            1
    

创建服务

  1. event-display 服务创建为 YAML 文件

    apiVersion: serving.knative.dev/v1
    kind: Service
    metadata:
      name: event-display
      namespace: default
    spec:
      template:
        spec:
          containers:
            - # This corresponds to
              # https://github.com/knative/eventing/tree/main/cmd/event_display/main.go
              image: gcr.io/knative-releases/knative.dev/eventing/cmd/event_display
    
  2. 通过运行命令应用 YAML 文件

    kubectl apply -f <filename>.yaml
    
    其中 <filename> 是您在上一步中创建的文件的名称。

    示例输出

    service.serving.knative.dev/event-display created
    

  3. 通过运行以下命令,确保服务 Pod 正在运行

    kubectl get pods
    

    Pod 名称以 event-display 为前缀

    NAME                                            READY     STATUS    RESTARTS   AGE
    event-display-00001-deployment-5d5df6c7-gv2j4   2/2       Running   0          72s
    

Kafka 事件源

  1. 相应地修改 source/event-source.yaml 文件,包括引导服务器、主题等

    apiVersion: sources.knative.dev/v1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
      topics:
      - knative-demo-topic
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display
    
  2. 部署事件源

    kubectl apply -f event-source.yaml
    

    示例输出

    kafkasource.sources.knative.dev/kafka-source created
    

  3. 验证 KafkaSource 是否已准备就绪

    kubectl get kafkasource kafka-source
    

    示例输出

    NAME           TOPICS                   BOOTSTRAPSERVERS                            READY   REASON   AGE
    kafka-source   ["knative-demo-topic"]   ["my-cluster-kafka-bootstrap.kafka:9092"]   True             26h
    

伸缩

要调度更多的或更少的消费者,可以对 KafkaSource 进行伸缩,并将它们分配给不同的 dispatcher pod。kafkasource 状态在 status.placements 键下显示此类分配。

您可以使用以下格式通过 kubectl 伸缩 KafkaSource

kubectl scale kafkasource -n <ns> <kafkasource-name> --replicas=<number-of-replicas> # e.g. 12 replicas for a topic with 12 partitions

或者,如果您使用的是 GitOps 方法,可以在您的存储库中添加 consumers 键,如下面的示例所示,并提交它

    apiVersion: sources.knative.dev/v1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092 
      consumers: 12    # Number of replicas
      topics:
      - knative-demo-topic
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display

使用 KEDA 自动伸缩

您可以与 KEDA 一起自动伸缩 KafkaSource。有关如何启用和配置此功能的更多信息,请在此处阅读说明

验证

  1. 向 Apache Kafka 主题(如以下示例所示)中产生一条消息 ({"msg": "This is a test!"})

    kubectl -n kafka run kafka-producer -ti --image=strimzi/kafka:0.14.0-kafka-2.3.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --broker-list my-cluster-kafka-bootstrap:9092 --topic knative-demo-topic
    

    提示

    如果您没有看到命令提示符,请尝试按 Enter

  2. 验证 Service 是否已收到来自事件源的消息

    kubectl logs --selector='serving.knative.dev/service=event-display' -c user-container
    

    示例输出

    ☁️ cloudevents.Event
    Validation: valid
    Context Attributes,
      specversion: 1.0
      type: dev.knative.kafka.event
      source: /apis/v1/namespaces/default/kafkasources/kafka-source#my-topic
      subject: partition:0#564
      id: partition:0/offset:564
      time: 2020-02-10T18:10:23.861866615Z
      datacontenttype: application/json
    Extensions,
      key:
    Data,
        {
          "msg": "This is a test!"
        }
    

处理投递失败

KafkaSource 实现 Delivery 规范,允许您为其配置事件投递参数,这些参数在事件投递失败时应用

    apiVersion: sources.knative.dev/v1
    kind: KafkaSource
    metadata:
      name: kafka-source
    spec:
      consumerGroup: knative-group
      bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
      topics:
      - knative-demo-topic
      delivery:
        deadLetterSink:
          ref:
            apiVersion: serving.knative.dev/v1
            kind: Service
            name: example-sink
        backoffDelay: <duration>
        backoffPolicy: <policy-type>
        retry: <integer>
      sink:
        ref:
          apiVersion: serving.knative.dev/v1
          kind: Service
          name: event-display

delivery API 在 处理投递失败章节中讨论。

可选:指定键反序列化器

KafkaSource 从 Kafka 接收到消息时,它会将键转储到名为 Key 的事件扩展中,并将 Kafka 消息头转储到以 kafkaheader 开头的扩展中。

您可以在四种类型中指定键反序列化器

  • string(默认值)用于 UTF-8 编码的字符串
  • int 用于 32 位和 64 位有符号整数
  • float 用于 32 位和 64 位浮点数
  • byte-array 用于 Base64 编码的字节数组

要指定键反序列化器,请将标签 kafkasources.sources.knative.dev/key-type 添加到 KafkaSource 定义中,如下例所示

apiVersion: sources.knative.dev/v1
kind: KafkaSource
metadata:
name: kafka-source
labels:
  kafkasources.sources.knative.dev/key-type: int
spec:
consumerGroup: knative-group
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
sink:
  ref:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: event-display

可选:指定初始偏移量

默认情况下,KafkaSource 从每个分区中的最新偏移量开始消费。如果要从最早的偏移量开始消费,请将 initialOffset 字段设置为 earliest,例如

apiVersion: sources.knative.dev/v1
kind: KafkaSource
metadata:
  name: kafka-source
spec:
consumerGroup: knative-group
initialOffset: earliest
bootstrapServers:
- my-cluster-kafka-bootstrap.kafka:9092 # note the kafka namespace
topics:
- knative-demo-topic
sink:
  ref:
    apiVersion: serving.knative.dev/v1
    kind: Service
    name: event-display

注意

initialOffset 的有效值为 earliestlatest。任何其他值都会导致验证错误。仅当该消费者组没有已提交的偏移量时,此字段才有效。

连接到启用了 TLS 的 Kafka 代理

KafkaSource 支持 TLS 和 SASL 认证方法。要启用 TLS 认证,您必须拥有以下文件

  • CA 证书
  • 客户端证书和密钥

KafkaSource 期望这些文件是 PEM 格式。如果它们是另一种格式,例如 JKS,请将它们转换为 PEM。

  1. 通过运行以下命令,在将设置 KafkaSource 的命名空间中创建证书文件作为 secret

    kubectl create secret generic cacert --from-file=caroot.pem
    
    kubectl create secret tls kafka-secret --cert=certificate.pem --key=key.pem
    
  2. 应用 KafkaSource。相应地修改 bootstrapServerstopics 字段。

    apiVersion: sources.knative.dev/v1
    kind: KafkaSource
    metadata:
     name: kafka-source-with-tls
    spec:
     net:
       tls:
         enable: true
         cert:
           secretKeyRef:
             key: tls.crt
             name: kafka-secret
         key:
           secretKeyRef:
             key: tls.key
             name: kafka-secret
         caCert:
           secretKeyRef:
             key: caroot.pem
             name: cacert
     consumerGroup: knative-group
     bootstrapServers:
     - my-secure-kafka-bootstrap.kafka:443
     topics:
     - knative-demo-topic
     sink:
       ref:
         apiVersion: serving.knative.dev/v1
         kind: Service
         name: event-display
    

为 KafkaSources 启用 SASL

简单身份验证和安全层 (SASL) 用于 Apache Kafka 的身份验证。如果您在集群上使用 SASL 身份验证,用户必须向 Knative 提供凭据才能与 Kafka 集群通信,否则事件无法被生产或消费。

先决条件

  • 您可以访问启用了简单身份验证和安全层 (SASL) 的 Kafka 集群。

步骤

  1. 通过运行以下命令,创建使用 Kafka 集群的 SASL 信息的 secret

    STRIMZI_CRT=$(kubectl -n kafka get secret example-cluster-cluster-ca-cert --template='{{index.data "ca.crt"}}' | base64 --decode )
    
    SASL_PASSWD=$(kubectl -n kafka get secret example-user --template='{{index.data "password"}}' | base64 --decode )
    
    kubectl create secret -n default generic <secret_name> \
        --from-literal=ca.crt="$STRIMZI_CRT" \
        --from-literal=password="$SASL_PASSWD" \
        --from-literal=saslType="SCRAM-SHA-512" \
        --from-literal=user="example-user"
    
  2. 创建或修改 KafkaSource,使其包含以下 spec 选项

    apiVersion: sources.knative.dev/v1
    kind: KafkaSource
    metadata:
      name: example-source
    spec:
    ...
      net:
        sasl:
          enable: true
          user:
            secretKeyRef:
              name: <secret_name>
              key: user
          password:
            secretKeyRef:
              name: <secret_name>
              key: password
          type:
            secretKeyRef:
              name: <secret_name>
              key: saslType
        tls:
          enable: true
          caCert:
            secretKeyRef:
              name: <secret_name>
              key: ca.crt
    ...
    

    其中 <secret_name> 是上一步中生成的 secret 的名称。

清理步骤

  1. 删除 Kafka 事件源

    kubectl delete -f source/source.yaml kafkasource.sources.knative.dev
    

    示例输出

    "kafka-source" deleted
    

  2. 删除 event-display 服务

    kubectl delete -f source/event-display.yaml service.serving.knative.dev
    

    示例输出

    "event-display" deleted
    

  3. 可选:删除 Apache Kafka 主题

    kubectl delete -f kafka-topic.yaml
    

    示例输出

    kafkatopic.kafka.strimzi.io "knative-demo-topic" deleted
    

我们使用分析和 cookie 来了解网站流量。有关您使用我们网站的信息会与 Google 共享以达到此目的。了解更多。