跳到内容

创建接收适配器

作为源协调过程的一部分,您必须创建和部署底层的接收适配器。

接收适配器需要一个基于注入的 main 方法,该方法位于 cmd/receiver_adapter/main.go

// This Adapter generates events at a regular interval.
package main

import (
    "knative.dev/eventing/pkg/adapter"
    myadapter "knative.dev/sample-source/pkg/adapter"
)

func main() {
    adapter.Main("sample-source", myadapter.NewEnv, myadapter.NewAdapter)
}

接收适配器的 pkg 实现包含两个主要函数

  1. 一个 NewAdapter(ctx context.Context, aEnv adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter {} 调用,它通过 EnvConfigAccessor 传入变量来创建新的适配器。创建的适配器会传入 CloudEvents 客户端(事件将转发到此处)。在 Knative 生态系统中,这有时被称为 sink 或 ceClient。返回值是适配器本地结构体定义的适配器引用。

    在示例源的情况下

    // Adapter generates events at a regular interval.
    type Adapter struct {
        logger   *zap.Logger
        interval time.Duration
        nextID   int
        client   cloudevents.Client
    }
    
  2. 一个 Start 函数,作为适配器 struct 的接口实现

    func (a *Adapter) Start(stopCh <-chan struct{}) error {
    

    stopCh 是停止适配器的信号。否则,函数的作用是处理下一个事件。

    sample-source 的情况下,此函数会每隔 X 时间间隔创建一个 CloudEvent 以转发到指定的 sink,如由资源 YAML 加载的 EnvConfigAccessor 参数所指定

    func (a *Adapter) Start(stopCh <-chan struct{}) error {
        a.logger.Infow("Starting heartbeat", zap.String("interval", a.interval.String()))
        for {
            select {
            case <-time.After(a.interval):
                event := a.newEvent()
                a.logger.Infow("Sending new event", zap.String("event", event.String()))
                if result := a.client.Send(context.Background(), event); !cloudevents.IsACK(result) {
                    a.logger.Infow("failed to send event", zap.String("event", event.String()), zap.Error(result))
                    // We got an error but it could be transient, try again next interval.
                    continue
                }
            case <-stopCh:
                a.logger.Info("Shutting down...")
                return nil
            }
        }
    }
    

在控制器中管理接收适配器

  1. 更新 ObservedGeneration 并初始化 Status 条件,如 samplesource_lifecycle.gosamplesource_types.go 文件中所定义

    src.Status.InitializeConditions()
    src.Status.ObservedGeneration = src.Generation
    
  2. 创建接收适配器。

    1. 验证指定的 Kubernetes 资源是否有效,并相应地更新 Status

    2. 组装 ReceiveAdapterArgs

      raArgs := resources.ReceiveAdapterArgs{
              EventSource:    src.Namespace + "/" + src.Name,
              Image:          r.ReceiveAdapterImage,
              Source:         src,
              Labels:         resources.Labels(src.Name),
              AdditionalEnvs: r.configAccessor.ToEnvVars(), // Grab config envs for tracing/logging/metrics
          }
      

      注意

      确切的参数可能会根据功能需求而改变。根据提供的参数创建底层部署,根据需要匹配 Pod 模板、标签、所有者引用等以填充部署。示例:pkg/reconciler/sample/resources/receive_adapter.go

    3. 获取现有接收适配器部署

      namespace := owner.GetObjectMeta().GetNamespace()
      ra, err := r.KubeClientSet.AppsV1().Deployments(namespace).Get(expected.Name, metav1.GetOptions{})
      
    4. 如果不存在现有接收适配器部署,则创建一个

      ra, err = r.KubeClientSet.AppsV1().Deployments(namespace).Create(expected)
      
    5. 检查预期规范是否与现有规范不同,如果需要则更新部署

      } else if r.podSpecImageSync(expected.Spec.Template.Spec, ra.Spec.Template.Spec) {
          ra.Spec.Template.Spec = expected.Spec.Template.Spec
          if ra, err = r.KubeClientSet.AppsV1().Deployments(namespace).Update(ra); err != nil {
              return ra, err
          }
      
    6. 如果已更新,则记录事件

      return pkgreconciler.NewEvent(corev1.EventTypeNormal, "DeploymentUpdated", "updated deployment: \"%s/%s\"", namespace, name)
      
    7. 如果成功,则更新 StatusMarkDeployed

      src.Status.PropagateDeploymentAvailability(ra)
      
  3. 创建 SinkBinding 以将接收适配器与 sink 绑定。

    1. 为接收适配器部署创建 Reference。此部署是 SinkBinding 的源

      tracker.Reference{
          APIVersion: appsv1.SchemeGroupVersion.String(),
          Kind:       "Deployment",
          Namespace:  ra.Namespace,
          Name:       ra.Name,
      }
      
    2. 获取现有 SinkBinding

      namespace := owner.GetObjectMeta().GetNamespace()
      sb, err := r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Get(expected.Name, metav1.GetOptions{})
      
    3. 如果不存在现有 SinkBinding,则创建一个

      sb, err = r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Create(expected)
      
    4. 检查预期规范是否与现有规范不同,如果需要则更新 SinkBinding

      else if r.specChanged(sb.Spec, expected.Spec) {
          sb.Spec = expected.Spec
          if sb, err = r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Update(sb); err != nil {
              return sb, err
          }
      
    5. 如果已更新,则记录事件

      return pkgreconciler.NewEvent(corev1.EventTypeNormal, "SinkBindingUpdated", "updated SinkBinding: \"%s/%s\"", namespace, name)
      
    6. 使用结果 MarkSink

      src.Status.MarkSink(sb.Status.SinkURI)
      
  4. 返回一个新的协调器事件,表明过程已完成

    return pkgreconciler.NewEvent(corev1.EventTypeNormal, "SampleSourceReconciled", "SampleSource reconciled: \"%s/%s\"", namespace, name)
    

我们使用分析和 cookie 来了解网站流量。有关您使用我们网站的信息会与 Google 共享以达到此目的。了解更多。