跳到内容

适用于 Apache Kafka 的 Knative Sink

KafkaSink 是一个 Apache Kafka 原生 Sink 实现,将传入的 CloudEvent 持久化到可配置的 Apache Kafka Topic。本页面展示了如何安装和配置 Knative KafkaSink

先决条件

您必须有权访问已安装 Knative Eventing 的 Kubernetes 集群。

安装

  1. 安装 Kafka 控制器

    kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.20.0/eventing-kafka-controller.yaml
    
  2. 安装 KafkaSink 数据平面

    kubectl apply -f https://github.com/knative-extensions/eventing-kafka-broker/releases/download/knative-v1.20.0/eventing-kafka-sink.yaml
    
  3. 验证 kafka-controllerkafka-sink-receiver 部署正在运行

    kubectl get deployments.apps -n knative-eventing
    

    示例输出

    NAME                           READY   UP-TO-DATE   AVAILABLE   AGE
    eventing-controller            1/1     1            1           10s
    eventing-webhook               1/1     1            1           9s
    kafka-controller               1/1     1            1           3s
    kafka-sink-receiver            1/1     1            1           5s
    

KafkaSink 示例

KafkaSink 对象看起来类似于以下内容

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
  name: my-kafka-sink
  namespace: default
spec:
  topic: mytopic
  bootstrapServers:
   - my-cluster-kafka-bootstrap.kafka:9092

输出主题内容模式

CloudEvent 规范定义了 2 种传输 CloudEvent 的模式:结构化和二进制。

“结构化模式消息”是指事件使用独立事件格式完全编码并存储在消息体中的消息。

结构化内容模式将事件元数据和数据一起保留在有效负载中,从而允许同一事件在多个路由跳和多个协议之间简单转发。

“二进制模式消息”是指事件数据存储在消息体中,事件属性存储为消息元数据一部分的消息。

二进制内容模式适应任何形状的事件数据,并允许高效传输,无需转码工作。

具有指定 contentMode 的 KafkaSink 对象看起来类似于以下内容

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
  name: my-kafka-sink
  namespace: default
spec:
  topic: mytopic
  bootstrapServers:
   - my-cluster-kafka-bootstrap.kafka:9092

  # CloudEvent content mode of Kafka messages sent to the topic.
  # Possible values:
  # - structured
  # - binary
  #
  # default: binary.
  #
  # CloudEvent spec references:
  # - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/spec.md#message
  # - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/bindings/kafka-protocol-binding.md#33-structured-content-mode
  # - https://github.com/cloudevents/spec/blob/v1.0.2/cloudevents/bindings/kafka-protocol-binding.md#32-binary-content-mode
  contentMode: binary # or structured

安全

Knative 支持以下 Apache Kafka 安全功能

启用安全功能

要启用安全功能,在 KafkaSink 规范中,您可以引用一个 Secret

apiVersion: eventing.knative.dev/v1alpha1
kind: KafkaSink
metadata:
   name: my-kafka-sink
   namespace: default
spec:
   topic: mytopic
   bootstrapServers:
      - my-cluster-kafka-bootstrap.kafka:9092
   auth:
     secret:
       ref:
         name: my_secret

注意

Secret my_secret 必须存在于 KafkaSink 所在的命名空间中。证书和密钥必须采用 PEM 格式

使用 SASL 进行身份验证

Knative 支持以下 SASL 机制

  • PLAIN
  • SCRAM-SHA-256
  • SCRAM-SHA-512

要使用特定的 SASL 机制,请将 <sasl_mechanism> 替换为您选择的机制。

使用 SASL 身份验证,不加密

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SASL_PLAINTEXT \
  --from-literal=sasl.mechanism=<sasl_mechanism> \
  --from-literal=user=<my_user> \
  --from-literal=password=<my_password>

使用 SASL 身份验证并使用 SSL 加密

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SASL_SSL \
  --from-literal=sasl.mechanism=<sasl_mechanism> \
  --from-file=ca.crt=caroot.pem \
  --from-literal=user=<my_user> \
  --from-literal=password=<my_password>

使用 SSL 加密,无需客户端身份验证

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SSL \
  --from-file=ca.crt=<my_caroot.pem_file_path> \
  --from-literal=user.skip=true

使用 SSL 进行身份验证和加密

kubectl create secret --namespace <namespace> generic <my_secret> \
  --from-literal=protocol=SSL \
  --from-file=ca.crt=<my_caroot.pem_file_path> \
  --from-file=user.crt=<my_cert.pem_file_path> \
  --from-file=user.key=<my_key.pem_file_path>

注意

可以省略 ca.crt 以启用回退并使用系统的根 CA 集。

Kafka 生产者配置

Kafka 生产者是负责向 Apache Kafka 集群发送事件的组件。您可以通过修改 knative-eventing 命名空间中的 config-kafka-sink-data-plane ConfigMap 来更改集群中 Kafka 生产者的配置。

此 ConfigMap 中可用设置的文档可在 Apache Kafka 网站上找到,特别是生产者配置

启用数据平面组件的调试日志记录

要启用数据平面组件的调试日志记录,请在 kafka-config-logging ConfigMap 中将日志记录级别更改为 DEBUG

  1. kafka-config-logging ConfigMap 创建为包含以下内容的 YAML 文件

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: kafka-config-logging
      namespace: knative-eventing
    data:
      config.xml: |
        <configuration>
          <appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
            <encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
          </appender>
          <root level="DEBUG">
            <appender-ref ref="jsonConsoleAppender"/>
          </root>
        </configuration>
    
  2. 通过运行命令应用 YAML 文件

    kubectl apply -f <filename>.yaml
    
    其中 <filename> 是您在上一步中创建的文件的名称。

  3. 重新启动 kafka-sink-receiver

    kubectl rollout restart deployment -n knative-eventing kafka-sink-receiver
    

我们使用分析和 cookie 来了解网站流量。有关您使用我们网站的信息会与 Google 共享以达到此目的。了解更多。