创建接收适配器¶
作为源协调过程的一部分,您必须创建和部署底层的接收适配器。
接收适配器需要一个基于注入的 main 方法,该方法位于 cmd/receiver_adapter/main.go
// This Adapter generates events at a regular interval.
package main
import (
"knative.dev/eventing/pkg/adapter"
myadapter "knative.dev/sample-source/pkg/adapter"
)
func main() {
adapter.Main("sample-source", myadapter.NewEnv, myadapter.NewAdapter)
}
接收适配器的 pkg 实现包含两个主要函数
-
一个
NewAdapter(ctx context.Context, aEnv adapter.EnvConfigAccessor, ceClient cloudevents.Client) adapter.Adapter {}调用,它通过EnvConfigAccessor传入变量来创建新的适配器。创建的适配器会传入 CloudEvents 客户端(事件将转发到此处)。在 Knative 生态系统中,这有时被称为 sink 或ceClient。返回值是适配器本地结构体定义的适配器引用。在示例源的情况下
// Adapter generates events at a regular interval. type Adapter struct { logger *zap.Logger interval time.Duration nextID int client cloudevents.Client } -
一个
Start函数,作为适配器struct的接口实现func (a *Adapter) Start(stopCh <-chan struct{}) error {stopCh是停止适配器的信号。否则,函数的作用是处理下一个事件。在
sample-source的情况下,此函数会每隔 X 时间间隔创建一个 CloudEvent 以转发到指定的 sink,如由资源 YAML 加载的EnvConfigAccessor参数所指定func (a *Adapter) Start(stopCh <-chan struct{}) error { a.logger.Infow("Starting heartbeat", zap.String("interval", a.interval.String())) for { select { case <-time.After(a.interval): event := a.newEvent() a.logger.Infow("Sending new event", zap.String("event", event.String())) if result := a.client.Send(context.Background(), event); !cloudevents.IsACK(result) { a.logger.Infow("failed to send event", zap.String("event", event.String()), zap.Error(result)) // We got an error but it could be transient, try again next interval. continue } case <-stopCh: a.logger.Info("Shutting down...") return nil } } }
在控制器中管理接收适配器¶
-
更新
ObservedGeneration并初始化Status条件,如samplesource_lifecycle.go和samplesource_types.go文件中所定义src.Status.InitializeConditions() src.Status.ObservedGeneration = src.Generation -
创建接收适配器。
-
验证指定的 Kubernetes 资源是否有效,并相应地更新
Status。 -
组装
ReceiveAdapterArgsraArgs := resources.ReceiveAdapterArgs{ EventSource: src.Namespace + "/" + src.Name, Image: r.ReceiveAdapterImage, Source: src, Labels: resources.Labels(src.Name), AdditionalEnvs: r.configAccessor.ToEnvVars(), // Grab config envs for tracing/logging/metrics }注意
确切的参数可能会根据功能需求而改变。根据提供的参数创建底层部署,根据需要匹配 Pod 模板、标签、所有者引用等以填充部署。示例:pkg/reconciler/sample/resources/receive_adapter.go
-
获取现有接收适配器部署
namespace := owner.GetObjectMeta().GetNamespace() ra, err := r.KubeClientSet.AppsV1().Deployments(namespace).Get(expected.Name, metav1.GetOptions{}) -
如果不存在现有接收适配器部署,则创建一个
ra, err = r.KubeClientSet.AppsV1().Deployments(namespace).Create(expected) -
检查预期规范是否与现有规范不同,如果需要则更新部署
} else if r.podSpecImageSync(expected.Spec.Template.Spec, ra.Spec.Template.Spec) { ra.Spec.Template.Spec = expected.Spec.Template.Spec if ra, err = r.KubeClientSet.AppsV1().Deployments(namespace).Update(ra); err != nil { return ra, err } -
如果已更新,则记录事件
return pkgreconciler.NewEvent(corev1.EventTypeNormal, "DeploymentUpdated", "updated deployment: \"%s/%s\"", namespace, name) -
如果成功,则更新
Status并MarkDeployedsrc.Status.PropagateDeploymentAvailability(ra)
-
-
创建 SinkBinding 以将接收适配器与 sink 绑定。
-
为接收适配器部署创建
Reference。此部署是 SinkBinding 的源tracker.Reference{ APIVersion: appsv1.SchemeGroupVersion.String(), Kind: "Deployment", Namespace: ra.Namespace, Name: ra.Name, } -
获取现有 SinkBinding
namespace := owner.GetObjectMeta().GetNamespace() sb, err := r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Get(expected.Name, metav1.GetOptions{}) -
如果不存在现有 SinkBinding,则创建一个
sb, err = r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Create(expected) -
检查预期规范是否与现有规范不同,如果需要则更新 SinkBinding
else if r.specChanged(sb.Spec, expected.Spec) { sb.Spec = expected.Spec if sb, err = r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Update(sb); err != nil { return sb, err } -
如果已更新,则记录事件
return pkgreconciler.NewEvent(corev1.EventTypeNormal, "SinkBindingUpdated", "updated SinkBinding: \"%s/%s\"", namespace, name) -
使用结果
MarkSinksrc.Status.MarkSink(sb.Status.SinkURI)
-
-
返回一个新的协调器事件,表明过程已完成
return pkgreconciler.NewEvent(corev1.EventTypeNormal, "SampleSourceReconciled", "SampleSource reconciled: \"%s/%s\"", namespace, name)